You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/09 12:02:34 UTC
[9/9] flink git commit: [FLINK-5824] (followup) Minor code cleanups
in CheckForbiddenMethodsUsage
[FLINK-5824] (followup) Minor code cleanups in CheckForbiddenMethodsUsage
- Move to maual tests package
- Adjust order of methods (ctor, instance, class)
- Replace Guava with Java Util
- Make charset in SimpleStringSchema configurable
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3ab91cc5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3ab91cc5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3ab91cc5
Branch: refs/heads/master
Commit: 3ab91cc5d233265b7c4a1497e7294c9065261dec
Parents: 53fedbd
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 8 16:10:52 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 9 13:00:56 2017 +0100
----------------------------------------------------------------------
.../util/serialization/SimpleStringSchema.java | 61 ++++++++-
.../serialization/SimpleStringSchemaTest.java | 51 ++++++++
.../test/manual/CheckForbiddenMethodsUsage.java | 126 +++++++++++++++++++
.../test/misc/CheckForbiddenMethodsUsage.java | 115 -----------------
4 files changed, 235 insertions(+), 118 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3ab91cc5/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
index 7f5a841..ddc55a2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
@@ -20,19 +20,59 @@ package org.apache.flink.streaming.util.serialization;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.configuration.ConfigConstants;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Very simple serialization schema for strings.
+ *
+ * <p>By default, the serializer uses "UTF-8" for string/byte conversion.
*/
@PublicEvolving
public class SimpleStringSchema implements DeserializationSchema<String>, SerializationSchema<String> {
private static final long serialVersionUID = 1L;
+ /** The charset to use to convert between strings and bytes.
+ * The field is transient because we serialize a different delegate object instead */
+ private transient Charset charset;
+
+ /**
+ * Creates a new SimpleStringSchema that uses "UTF-8" as the encoding.
+ */
+ public SimpleStringSchema() {
+ this(StandardCharsets.UTF_8);
+ }
+
+ /**
+ * Creates a new SimpleStringSchema that uses the given charset to convert between strings and bytes.
+ *
+ * @param charset The charset to use to convert between strings and bytes.
+ */
+ public SimpleStringSchema(Charset charset) {
+ this.charset = checkNotNull(charset);
+ }
+
+ /**
+ * Gets the charset used by this schema for serialization.
+ * @return The charset used by this schema for serialization.
+ */
+ public Charset getCharset() {
+ return charset;
+ }
+
+ // ------------------------------------------------------------------------
+ // Kafka Serialization
+ // ------------------------------------------------------------------------
+
@Override
public String deserialize(byte[] message) {
- return new String(message, ConfigConstants.DEFAULT_CHARSET);
+ return new String(message, charset);
}
@Override
@@ -42,11 +82,26 @@ public class SimpleStringSchema implements DeserializationSchema<String>, Serial
@Override
public byte[] serialize(String element) {
- return element.getBytes(ConfigConstants.DEFAULT_CHARSET);
+ return element.getBytes(charset);
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
+
+ // ------------------------------------------------------------------------
+ // Java Serialization
+ // ------------------------------------------------------------------------
+
+ private void writeObject (ObjectOutputStream out) throws IOException {
+ out.defaultWriteObject();
+ out.writeUTF(charset.name());
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ String charsetName = in.readUTF();
+ this.charset = Charset.forName(charsetName);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3ab91cc5/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java
new file mode 100644
index 0000000..74b1d18
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.junit.Test;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for the {@link SimpleStringSchema}.
+ */
+public class SimpleStringSchemaTest {
+
+ @Test
+ public void testSerializationWithAnotherCharset() {
+ final Charset charset = StandardCharsets.UTF_16BE;
+ final String string = "\u4e4b\u6383\u63cf\u53e4\u7c4d\u7248\u5be6\u4e43\u59da\u9f10\u7684";
+ final byte[] bytes = string.getBytes(charset);
+
+ assertArrayEquals(bytes, new SimpleStringSchema(charset).serialize(string));
+ assertEquals(string, new SimpleStringSchema(charset).deserialize(bytes));
+ }
+
+ @Test
+ public void testSerializability() throws Exception {
+ final SimpleStringSchema schema = new SimpleStringSchema(StandardCharsets.UTF_16LE);
+ final SimpleStringSchema copy = CommonTestUtils.createCopySerializable(schema);
+
+ assertEquals(schema.getCharset(), copy.getCharset());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3ab91cc5/flink-tests/src/test/java/org/apache/flink/test/manual/CheckForbiddenMethodsUsage.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/CheckForbiddenMethodsUsage.java b/flink-tests/src/test/java/org/apache/flink/test/manual/CheckForbiddenMethodsUsage.java
new file mode 100644
index 0000000..aabe7c0
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/CheckForbiddenMethodsUsage.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.manual;
+
+import org.apache.flink.types.parser.FieldParserTest;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.reflections.Reflections;
+import org.reflections.scanners.MemberUsageScanner;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Member;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests via reflection that certain methods are not called in Flink.
+ *
+ * <p>Forbidden calls include:
+ * - Byte / String conversions that do not specify an explicit charset
+ * because they produce different results in different locales
+ */
+public class CheckForbiddenMethodsUsage {
+
+ private static class ForbiddenCall {
+
+ private final Method method;
+ private final Constructor<?> constructor;
+ private final List<Member> exclusions;
+
+ private ForbiddenCall(Method method, Constructor<?> ctor, List<Member> exclusions) {
+ this.method = method;
+ this.exclusions = exclusions;
+ this.constructor = ctor;
+ }
+
+ public Method getMethod() {
+ return method;
+ }
+
+ public List<Member> getExclusions() {
+ return exclusions;
+ }
+
+ public Set<Member> getUsages(Reflections reflections) {
+ if (method == null) {
+ return reflections.getConstructorUsage(constructor);
+ }
+
+ return reflections.getMethodUsage(method);
+ }
+
+ public static ForbiddenCall of(Method method) {
+ return new ForbiddenCall(method, null, Collections.<Member>emptyList());
+ }
+
+ public static ForbiddenCall of(Method method, List<Member> exclusions) {
+ return new ForbiddenCall(method, null, exclusions);
+ }
+
+ public static ForbiddenCall of(Constructor<?> ctor) {
+ return new ForbiddenCall(null, ctor, Collections.<Member>emptyList());
+ }
+
+ public static ForbiddenCall of(Constructor<?> ctor, List<Member> exclusions) {
+ return new ForbiddenCall(null, ctor, exclusions);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static final List<ForbiddenCall> forbiddenCalls = new ArrayList<>();
+
+ @BeforeClass
+ public static void init() throws Exception {
+ forbiddenCalls.add(ForbiddenCall.of(String.class.getMethod("getBytes"),
+ Arrays.<Member>asList(
+ FieldParserTest.class.getMethod("testEndsWithDelimiter"),
+ FieldParserTest.class.getMethod("testDelimiterNext")
+ )));
+ forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class)));
+ forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class, int.class)));
+ forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class, int.class, int.class)));
+ forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class, int.class, int.class, int.class)));
+ }
+
+ @Test
+ public void testNoDefaultEncoding() throws Exception {
+ final Reflections reflections = new Reflections(new ConfigurationBuilder()
+ .useParallelExecutor(Runtime.getRuntime().availableProcessors())
+ .addUrls(ClasspathHelper.forPackage("org.apache.flink"))
+ .addScanners(new MemberUsageScanner()));
+
+
+ for (ForbiddenCall forbiddenCall : forbiddenCalls) {
+ final Set<Member> methodUsages = forbiddenCall.getUsages(reflections);
+ methodUsages.removeAll(forbiddenCall.getExclusions());
+ assertEquals("Unexpected calls: " + methodUsages,0, methodUsages.size());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3ab91cc5/flink-tests/src/test/java/org/apache/flink/test/misc/CheckForbiddenMethodsUsage.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CheckForbiddenMethodsUsage.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CheckForbiddenMethodsUsage.java
deleted file mode 100644
index 8fdf74b..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/CheckForbiddenMethodsUsage.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.test.misc;
-
-import com.google.common.collect.Lists;
-import org.apache.flink.types.parser.FieldParser;
-import org.apache.flink.types.parser.FieldParserTest;
-import org.apache.flink.types.parser.VarLengthStringParserTest;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.reflections.Reflections;
-import org.reflections.scanners.MemberUsageScanner;
-import org.reflections.util.ClasspathHelper;
-import org.reflections.util.ConfigurationBuilder;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Member;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-
-public class CheckForbiddenMethodsUsage {
-
- private static class ForbiddenCall {
- private final Method method;
- private final Constructor constructor;
- private final List<Member> exclusions;
-
- public Method getMethod() {
- return method;
- }
-
- public List<Member> getExclusions() {
- return exclusions;
- }
-
- private ForbiddenCall(Method method, Constructor ctor, List<Member> exclusions) {
- this.method = method;
- this.exclusions = exclusions;
- this.constructor = ctor;
- }
-
- public static ForbiddenCall of(Method method) {
- return new ForbiddenCall(method, null, Collections.<Member>emptyList());
- }
-
- public static ForbiddenCall of(Method method, List<Member> exclusions) {
- return new ForbiddenCall(method, null, exclusions);
- }
-
- public static ForbiddenCall of(Constructor ctor) {
- return new ForbiddenCall(null, ctor, Collections.<Member>emptyList());
- }
-
- public static ForbiddenCall of(Constructor ctor, List<Member> exclusions) {
- return new ForbiddenCall(null, ctor, exclusions);
- }
-
- public Set<Member> getUsages(Reflections reflections) {
- if (method == null) {
- return reflections.getConstructorUsage(constructor);
- }
-
- return reflections.getMethodUsage(method);
- }
- }
-
- private static List<ForbiddenCall> forbiddenCalls = new ArrayList<>();
-
- @BeforeClass
- public static void init() throws Exception {
- forbiddenCalls.add(ForbiddenCall.of(String.class.getMethod("getBytes"),
- Lists.<Member>newArrayList(
- FieldParserTest.class.getMethod("testEndsWithDelimiter"),
- FieldParserTest.class.getMethod("testDelimiterNext")
- )));
- forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class)));
- forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class, int.class)));
- forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class, int.class, int.class)));
- forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class, int.class, int.class, int.class)));
- }
-
- @Test
- public void testNoDefaultEncoding() throws Exception {
- final Reflections reflections = new Reflections(new ConfigurationBuilder()
- .addUrls(ClasspathHelper.forPackage("org.apache.flink"))
- .addScanners(new MemberUsageScanner()));
-
-
- for (ForbiddenCall forbiddenCall : forbiddenCalls) {
- final Set<Member> methodUsages = forbiddenCall.getUsages(reflections);
- methodUsages.removeAll(forbiddenCall.getExclusions());
- assertEquals("Unexpected calls: " + methodUsages,0, methodUsages.size());
- }
- }
-}