You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/09 12:02:34 UTC

[9/9] flink git commit: [FLINK-5824] (followup) Minor code cleanups in CheckForbiddenMethodsUsage

[FLINK-5824] (followup) Minor code cleanups in CheckForbiddenMethodsUsage

  - Move to maual tests package
  - Adjust order of methods (ctor, instance, class)
  - Replace Guava with Java Util
  - Make charset in SimpleStringSchema configurable


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3ab91cc5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3ab91cc5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3ab91cc5

Branch: refs/heads/master
Commit: 3ab91cc5d233265b7c4a1497e7294c9065261dec
Parents: 53fedbd
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 8 16:10:52 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 9 13:00:56 2017 +0100

----------------------------------------------------------------------
 .../util/serialization/SimpleStringSchema.java  |  61 ++++++++-
 .../serialization/SimpleStringSchemaTest.java   |  51 ++++++++
 .../test/manual/CheckForbiddenMethodsUsage.java | 126 +++++++++++++++++++
 .../test/misc/CheckForbiddenMethodsUsage.java   | 115 -----------------
 4 files changed, 235 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3ab91cc5/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
index 7f5a841..ddc55a2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
@@ -20,19 +20,59 @@ package org.apache.flink.streaming.util.serialization;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.configuration.ConfigConstants;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Very simple serialization schema for strings.
+ * 
+ * <p>By default, the serializer uses "UTF-8" for string/byte conversion.
  */
 @PublicEvolving
 public class SimpleStringSchema implements DeserializationSchema<String>, SerializationSchema<String> {
 
 	private static final long serialVersionUID = 1L;
 
+	/** The charset to use to convert between strings and bytes.
+	 * The field is transient because we serialize a different delegate object instead */
+	private transient Charset charset;
+
+	/**
+	 * Creates a new SimpleStringSchema that uses "UTF-8" as the encoding.
+	 */
+	public SimpleStringSchema() {
+		this(StandardCharsets.UTF_8);
+	}
+
+	/**
+	 * Creates a new SimpleStringSchema that uses the given charset to convert between strings and bytes.
+	 * 
+	 * @param charset The charset to use to convert between strings and bytes.
+	 */
+	public SimpleStringSchema(Charset charset) {
+		this.charset = checkNotNull(charset);
+	}
+
+	/**
+	 * Gets the charset used by this schema for serialization.
+	 * @return The charset used by this schema for serialization.
+	 */
+	public Charset getCharset() {
+		return charset;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Kafka Serialization
+	// ------------------------------------------------------------------------
+
 	@Override
 	public String deserialize(byte[] message) {
-		return new String(message, ConfigConstants.DEFAULT_CHARSET);
+		return new String(message, charset);
 	}
 
 	@Override
@@ -42,11 +82,26 @@ public class SimpleStringSchema implements DeserializationSchema<String>, Serial
 
 	@Override
 	public byte[] serialize(String element) {
-		return element.getBytes(ConfigConstants.DEFAULT_CHARSET);
+		return element.getBytes(charset);
 	}
 
 	@Override
 	public TypeInformation<String> getProducedType() {
 		return BasicTypeInfo.STRING_TYPE_INFO;
 	}
+
+	// ------------------------------------------------------------------------
+	//  Java Serialization
+	// ------------------------------------------------------------------------
+
+	private void writeObject (ObjectOutputStream out) throws IOException {
+		out.defaultWriteObject();
+		out.writeUTF(charset.name());
+	}
+
+	private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
+		in.defaultReadObject();
+		String charsetName = in.readUTF();
+		this.charset = Charset.forName(charsetName);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab91cc5/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java
new file mode 100644
index 0000000..74b1d18
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.junit.Test;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for the {@link SimpleStringSchema}.
+ */
+public class SimpleStringSchemaTest {
+
+	@Test
+	public void testSerializationWithAnotherCharset() {
+		final Charset charset = StandardCharsets.UTF_16BE;
+		final String string = "\u4e4b\u6383\u63cf\u53e4\u7c4d\u7248\u5be6\u4e43\u59da\u9f10\u7684";
+		final byte[] bytes = string.getBytes(charset);
+
+		assertArrayEquals(bytes, new SimpleStringSchema(charset).serialize(string));
+		assertEquals(string, new SimpleStringSchema(charset).deserialize(bytes));
+	}
+
+	@Test
+	public void testSerializability() throws Exception {
+		final SimpleStringSchema schema = new SimpleStringSchema(StandardCharsets.UTF_16LE);
+		final SimpleStringSchema copy = CommonTestUtils.createCopySerializable(schema);
+
+		assertEquals(schema.getCharset(), copy.getCharset());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab91cc5/flink-tests/src/test/java/org/apache/flink/test/manual/CheckForbiddenMethodsUsage.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/CheckForbiddenMethodsUsage.java b/flink-tests/src/test/java/org/apache/flink/test/manual/CheckForbiddenMethodsUsage.java
new file mode 100644
index 0000000..aabe7c0
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/CheckForbiddenMethodsUsage.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.manual;
+
+import org.apache.flink.types.parser.FieldParserTest;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.reflections.Reflections;
+import org.reflections.scanners.MemberUsageScanner;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Member;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests via reflection that certain methods are not called in Flink.
+ * 
+ * <p>Forbidden calls include:
+ *   - Byte / String conversions that do not specify an explicit charset
+ *     because they produce different results in different locales
+ */
+public class CheckForbiddenMethodsUsage {
+
+	private static class ForbiddenCall {
+
+		private final Method method;
+		private final Constructor<?> constructor;
+		private final List<Member> exclusions;
+
+		private ForbiddenCall(Method method, Constructor<?> ctor, List<Member> exclusions) {
+			this.method = method;
+			this.exclusions = exclusions;
+			this.constructor = ctor;
+		}
+
+		public Method getMethod() {
+			return method;
+		}
+
+		public List<Member> getExclusions() {
+			return exclusions;
+		}
+
+		public Set<Member> getUsages(Reflections reflections) {
+			if (method == null) {
+				return reflections.getConstructorUsage(constructor);
+			}
+
+			return reflections.getMethodUsage(method);
+		}
+
+		public static ForbiddenCall of(Method method) {
+			return new ForbiddenCall(method, null, Collections.<Member>emptyList());
+		}
+
+		public static ForbiddenCall of(Method method, List<Member> exclusions) {
+			return new ForbiddenCall(method, null, exclusions);
+		}
+
+		public static ForbiddenCall of(Constructor<?> ctor) {
+			return new ForbiddenCall(null, ctor, Collections.<Member>emptyList());
+		}
+
+		public static ForbiddenCall of(Constructor<?> ctor, List<Member> exclusions) {
+			return new ForbiddenCall(null, ctor, exclusions);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final List<ForbiddenCall> forbiddenCalls = new ArrayList<>();
+
+	@BeforeClass
+	public static void init() throws Exception {
+		forbiddenCalls.add(ForbiddenCall.of(String.class.getMethod("getBytes"),
+			Arrays.<Member>asList(
+				FieldParserTest.class.getMethod("testEndsWithDelimiter"),
+				FieldParserTest.class.getMethod("testDelimiterNext")
+			)));
+		forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class)));
+		forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class, int.class)));
+		forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class, int.class, int.class)));
+		forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class, int.class, int.class, int.class)));
+	}
+
+	@Test
+	public void testNoDefaultEncoding() throws Exception {
+		final Reflections reflections = new Reflections(new ConfigurationBuilder()
+			.useParallelExecutor(Runtime.getRuntime().availableProcessors())
+			.addUrls(ClasspathHelper.forPackage("org.apache.flink"))
+			.addScanners(new MemberUsageScanner()));
+
+
+		for (ForbiddenCall forbiddenCall : forbiddenCalls) {
+			final Set<Member> methodUsages = forbiddenCall.getUsages(reflections);
+			methodUsages.removeAll(forbiddenCall.getExclusions());
+			assertEquals("Unexpected calls: " + methodUsages,0, methodUsages.size());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab91cc5/flink-tests/src/test/java/org/apache/flink/test/misc/CheckForbiddenMethodsUsage.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CheckForbiddenMethodsUsage.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CheckForbiddenMethodsUsage.java
deleted file mode 100644
index 8fdf74b..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/CheckForbiddenMethodsUsage.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.test.misc;
-
-import com.google.common.collect.Lists;
-import org.apache.flink.types.parser.FieldParser;
-import org.apache.flink.types.parser.FieldParserTest;
-import org.apache.flink.types.parser.VarLengthStringParserTest;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.reflections.Reflections;
-import org.reflections.scanners.MemberUsageScanner;
-import org.reflections.util.ClasspathHelper;
-import org.reflections.util.ConfigurationBuilder;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Member;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-
-public class CheckForbiddenMethodsUsage {
-
-	private static class ForbiddenCall {
-		private final Method method;
-		private final Constructor constructor;
-		private final List<Member> exclusions;
-
-		public Method getMethod() {
-			return method;
-		}
-
-		public List<Member> getExclusions() {
-			return exclusions;
-		}
-
-		private ForbiddenCall(Method method, Constructor ctor, List<Member> exclusions) {
-			this.method = method;
-			this.exclusions = exclusions;
-			this.constructor = ctor;
-		}
-
-		public static ForbiddenCall of(Method method) {
-			return new ForbiddenCall(method, null, Collections.<Member>emptyList());
-		}
-
-		public static ForbiddenCall of(Method method, List<Member> exclusions) {
-			return new ForbiddenCall(method, null, exclusions);
-		}
-
-		public static ForbiddenCall of(Constructor ctor) {
-			return new ForbiddenCall(null, ctor, Collections.<Member>emptyList());
-		}
-
-		public static ForbiddenCall of(Constructor ctor, List<Member> exclusions) {
-			return new ForbiddenCall(null, ctor, exclusions);
-		}
-
-		public Set<Member> getUsages(Reflections reflections) {
-			if (method == null) {
-				return reflections.getConstructorUsage(constructor);
-			}
-
-			return reflections.getMethodUsage(method);
-		}
-	}
-
-	private static List<ForbiddenCall> forbiddenCalls = new ArrayList<>();
-
-	@BeforeClass
-	public static void init() throws Exception {
-		forbiddenCalls.add(ForbiddenCall.of(String.class.getMethod("getBytes"),
-			Lists.<Member>newArrayList(
-				FieldParserTest.class.getMethod("testEndsWithDelimiter"),
-				FieldParserTest.class.getMethod("testDelimiterNext")
-			)));
-		forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class)));
-		forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class, int.class)));
-		forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class, int.class, int.class)));
-		forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class, int.class, int.class, int.class)));
-	}
-
-	@Test
-	public void testNoDefaultEncoding() throws Exception {
-		final Reflections reflections = new Reflections(new ConfigurationBuilder()
-			.addUrls(ClasspathHelper.forPackage("org.apache.flink"))
-			.addScanners(new MemberUsageScanner()));
-
-
-		for (ForbiddenCall forbiddenCall : forbiddenCalls) {
-			final Set<Member> methodUsages = forbiddenCall.getUsages(reflections);
-			methodUsages.removeAll(forbiddenCall.getExclusions());
-			assertEquals("Unexpected calls: " + methodUsages,0, methodUsages.size());
-		}
-	}
-}