You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/04/06 16:56:27 UTC

flink git commit: [FLINK-6268] [core] Object reuse for Either type

Repository: flink
Updated Branches:
  refs/heads/master c5173fa26 -> ae17718fb


[FLINK-6268] [core] Object reuse for Either type

Implement object reuse for Flink's Either type by storing a reference to
Right in Left and Left in Right. These references are private and remain
null until set by EitherSerializer when copying or deserializing with
object reuse.

This closes #3680


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ae17718f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ae17718f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ae17718f

Branch: refs/heads/master
Commit: ae17718fb9262f65b89b4551d2aecaab3ee23d9e
Parents: c5173fa
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Apr 5 10:12:04 2017 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Thu Apr 6 12:55:49 2017 -0400

----------------------------------------------------------------------
 .../typeutils/runtime/EitherSerializer.java     |  51 +++-----
 .../java/org/apache/flink/types/Either.java     |  87 ++++++++++++-
 .../typeutils/runtime/EitherSerializerTest.java | 129 +++++++++++++++++--
 3 files changed, 227 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ae17718f/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
index 4066e9a..d9018da 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
@@ -51,7 +51,7 @@ public class EitherSerializer<L, R> extends TypeSerializer<Either<L, R>> {
 
 	@Override
 	public boolean isImmutableType() {
-		return leftSerializer.isImmutableType() && rightSerializer.isImmutableType();
+		return false;
 	}
 
 	@Override
@@ -91,23 +91,16 @@ public class EitherSerializer<L, R> extends TypeSerializer<Either<L, R>> {
 
 	@Override
 	public Either<L, R> copy(Either<L, R> from, Either<L, R> reuse) {
-		if (from.isRight()) {
-			final R right = from.right();
-			if (reuse.isRight()) {
-				R copyRight = rightSerializer.copy(right, reuse.right());
-				return Right(copyRight);
-			}
-			else {
-				// if the reuse record isn't a right value, we cannot reuse
-				R copyRight = rightSerializer.copy(right);
-				return Right(copyRight);
-			}
-		}
-		else {
-			L left = from.left();
-			// reuse record is never a left value because we always create a right instance
-			L copyLeft = leftSerializer.copy(left);
-			return Left(copyLeft);
+		if (from.isLeft()) {
+			Left<L, R> to = Either.obtainLeft(reuse, leftSerializer);
+			L left = leftSerializer.copy(from.left(), to.left());
+			to.setValue(left);
+			return to;
+		} else {
+			Right<L, R> to = Either.obtainRight(reuse, rightSerializer);
+			R right = rightSerializer.copy(from.right(), to.right());
+			to.setValue(right);
+			return to;
 		}
 	}
 
@@ -142,18 +135,16 @@ public class EitherSerializer<L, R> extends TypeSerializer<Either<L, R>> {
 	@Override
 	public Either<L, R> deserialize(Either<L, R> reuse, DataInputView source) throws IOException {
 		boolean isLeft = source.readBoolean();
-		if (!isLeft) {
-			if (reuse.isRight()) {
-				return Right(rightSerializer.deserialize(reuse.right(), source));
-			}
-			else {
-				// if the reuse record isn't a right value, we cannot reuse
-				return Right(rightSerializer.deserialize(source));
-			}
-		}
-		else {
-			// reuse record is never a left value because we always create a right instance
-			return Left(leftSerializer.deserialize(source));
+		if (isLeft) {
+			Left<L, R> to = Either.obtainLeft(reuse, leftSerializer);
+			L left = leftSerializer.deserialize(to.left(), source);
+			to.setValue(left);
+			return to;
+		} else {
+			Right<L, R> to = Either.obtainRight(reuse, rightSerializer);
+			R right = rightSerializer.deserialize(to.right(), source);
+			to.setValue(right);
+			return to;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ae17718f/flink-core/src/main/java/org/apache/flink/types/Either.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/Either.java b/flink-core/src/main/java/org/apache/flink/types/Either.java
index a08e968..80f62f6 100644
--- a/flink-core/src/main/java/org/apache/flink/types/Either.java
+++ b/flink-core/src/main/java/org/apache/flink/types/Either.java
@@ -18,9 +18,12 @@
 
 package org.apache.flink.types;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.typeinfo.TypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.EitherTypeInfoFactory;
+import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
 
 /**
  * This type represents a value of one two possible types, Left or Right (a
@@ -92,7 +95,9 @@ public abstract class Either<L, R> {
 	 *            the type of Right
 	 */
 	public static class Left<L, R> extends Either<L, R> {
-		private final L value;
+		private L value;
+
+		private Right<L, R> right;
 
 		public Left(L value) {
 			this.value = java.util.Objects.requireNonNull(value);
@@ -108,6 +113,15 @@ public abstract class Either<L, R> {
 			throw new IllegalStateException("Cannot retrieve Right value on a Left");
 		}
 
+		/**
+		 * Sets the encapsulated value to another value
+		 *
+		 * @param value the new value of the encapsulated value
+		 */
+		public void setValue(L value) {
+			this.value = value;
+		}
+
 		@Override
 		public boolean equals(Object object) {
 			if (object instanceof Left<?, ?>) {
@@ -145,7 +159,9 @@ public abstract class Either<L, R> {
 	 *            the type of Right
 	 */
 	public static class Right<L, R> extends Either<L, R> {
-		private final R value;
+		private R value;
+
+		private Left<L, R> left;
 
 		public Right(R value) {
 			this.value = java.util.Objects.requireNonNull(value);
@@ -161,6 +177,15 @@ public abstract class Either<L, R> {
 			return value;
 		}
 
+		/**
+		 * Sets the encapsulated value to another value
+		 *
+		 * @param value the new value of the encapsulated value
+		 */
+		public void setValue(R value) {
+			this.value = value;
+		}
+
 		@Override
 		public boolean equals(Object object) {
 			if (object instanceof Right<?, ?>) {
@@ -188,4 +213,62 @@ public abstract class Either<L, R> {
 			return new Right<L, R>(right);
 		}
 	}
+
+	/**
+	 * Utility function for {@link EitherSerializer} to support object reuse.
+	 *
+	 * To support object reuse both subclasses of Either contain a reference to
+	 * an instance of the other type. This method provides access to and
+	 * initializes the cross-reference.
+	 *
+	 * @param input container for Left or Right value
+	 * @param leftSerializer for creating an instance of the left type
+	 * @param <L>
+	 *            the type of Left
+	 * @param <R>
+	 *            the type of Right
+	 * @return input if Left type else input's Left reference
+	 */
+	@Internal
+	public static <L, R> Left<L, R> obtainLeft(Either<L, R> input, TypeSerializer<L> leftSerializer) {
+		if (input.isLeft()) {
+			return (Left<L, R>) input;
+		} else {
+			Right<L, R> right = (Right<L, R>) input;
+			if (right.left == null) {
+				right.left = Left.of(leftSerializer.createInstance());
+				right.left.right = right;
+			}
+			return right.left;
+		}
+	}
+
+	/**
+	 * Utility function for {@link EitherSerializer} to support object reuse.
+	 *
+	 * To support object reuse both subclasses of Either contain a reference to
+	 * an instance of the other type. This method provides access to and
+	 * initializes the cross-reference.
+	 *
+	 * @param input container for Left or Right value
+	 * @param rightSerializer for creating an instance of the right type
+	 * @param <L>
+	 *            the type of Left
+	 * @param <R>
+	 *            the type of Right
+	 * @return input if Right type else input's Right reference
+	 */
+	@Internal
+	public static <L, R> Right<L, R> obtainRight(Either<L, R> input, TypeSerializer<R> rightSerializer) {
+		if (input.isRight()) {
+			return (Right<L, R>) input;
+		} else {
+			Left<L, R> left = (Left<L, R>) input;
+			if (left.right == null) {
+				left.right = Right.of(rightSerializer.createInstance());
+				left.right.left = left;
+			}
+			return left.right;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ae17718f/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java
index acf0d2e..2bf6abc 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java
@@ -18,21 +18,30 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-import static org.apache.flink.types.Either.Left;
-import static org.apache.flink.types.Either.Right;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.ComparatorTestBase.TestInputView;
+import org.apache.flink.api.common.typeutils.ComparatorTestBase.TestOutputView;
 import org.apache.flink.api.common.typeutils.SerializerTestInstance;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.types.Either;
 import org.apache.flink.api.java.typeutils.EitherTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.Either;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
 import org.junit.Test;
 
+import java.io.IOException;
+
+import static junit.framework.TestCase.assertSame;
+import static org.apache.flink.types.Either.Left;
+import static org.apache.flink.types.Either.Right;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
 public class EitherSerializerTest {
 
 	@SuppressWarnings("unchecked")
@@ -55,6 +64,26 @@ public class EitherSerializerTest {
 	testInstance.testAll();
 	}
 
+	@Test
+	public void testStringValueDoubleValueEither() {
+		@SuppressWarnings("unchecked")
+		Either<StringValue, DoubleValue>[] testData = new Either[] {
+			Left(new StringValue("banana")),
+			Left.of(new StringValue("apple")),
+			new Left(new StringValue("")),
+			Right(new DoubleValue(32.0)),
+			Right.of(new DoubleValue(Double.MIN_VALUE)),
+			new Right(new DoubleValue(Double.MAX_VALUE))};
+
+		EitherTypeInfo<StringValue, DoubleValue> eitherTypeInfo = new EitherTypeInfo<>(
+			ValueTypeInfo.STRING_VALUE_TYPE_INFO, ValueTypeInfo.DOUBLE_VALUE_TYPE_INFO);
+		EitherSerializer<StringValue, DoubleValue> eitherSerializer =
+			(EitherSerializer<StringValue, DoubleValue>) eitherTypeInfo.createSerializer(new ExecutionConfig());
+		SerializerTestInstance<Either<StringValue, DoubleValue>> testInstance =
+			new EitherSerializerTestInstance<>(eitherSerializer, eitherTypeInfo.getTypeClass(), -1, testData);
+		testInstance.testAll();
+	}
+
 	@SuppressWarnings("unchecked")
 	@Test
 	public void testEitherWithTuple() {
@@ -78,6 +107,90 @@ public class EitherSerializerTest {
 	testInstance.testAll();
 	}
 
+	@Test
+	public void testEitherWithTupleValues() {
+		@SuppressWarnings("unchecked")
+		Either<Tuple2<LongValue, LongValue>, DoubleValue>[] testData = new Either[] {
+			Left(new Tuple2<>(new LongValue(2L), new LongValue(9L))),
+			new Left<>(new Tuple2<>(new LongValue(Long.MIN_VALUE), new LongValue(Long.MAX_VALUE))),
+			new Right<>(new DoubleValue(32.0)),
+			Right(new DoubleValue(Double.MIN_VALUE)),
+			Right(new DoubleValue(Double.MAX_VALUE))};
+
+		EitherTypeInfo<Tuple2<LongValue, LongValue>, DoubleValue> eitherTypeInfo = new EitherTypeInfo<>(
+			new TupleTypeInfo<Tuple2<LongValue, LongValue>>(ValueTypeInfo.LONG_VALUE_TYPE_INFO, ValueTypeInfo.LONG_VALUE_TYPE_INFO),
+			ValueTypeInfo.DOUBLE_VALUE_TYPE_INFO);
+		EitherSerializer<Tuple2<LongValue, LongValue>, DoubleValue> eitherSerializer =
+			(EitherSerializer<Tuple2<LongValue, LongValue>, DoubleValue>) eitherTypeInfo.createSerializer(new ExecutionConfig());
+		SerializerTestInstance<Either<Tuple2<LongValue, LongValue>, DoubleValue>> testInstance =
+			new EitherSerializerTestInstance<>(eitherSerializer, eitherTypeInfo.getTypeClass(), -1, testData);
+		testInstance.testAll();
+	}
+
+	@Test
+	public void testEitherWithObjectReuse() {
+		EitherTypeInfo<LongValue, DoubleValue> eitherTypeInfo = new EitherTypeInfo<>(
+			ValueTypeInfo.LONG_VALUE_TYPE_INFO, ValueTypeInfo.DOUBLE_VALUE_TYPE_INFO);
+		EitherSerializer<LongValue, DoubleValue> eitherSerializer =
+			(EitherSerializer<LongValue, DoubleValue>) eitherTypeInfo.createSerializer(new ExecutionConfig());
+
+		LongValue lv = new LongValue();
+		DoubleValue dv = new DoubleValue();
+
+		Either<LongValue, DoubleValue> left = Left(lv);
+		Either<LongValue, DoubleValue> right = Right(dv);
+
+		// the first copy creates a new instance of Left
+		Either<LongValue, DoubleValue> copy0 = eitherSerializer.copy(left, right);
+
+		// then the cross-references are used for future copies
+		Either<LongValue, DoubleValue> copy1 = eitherSerializer.copy(right, copy0);
+		Either<LongValue, DoubleValue> copy2 = eitherSerializer.copy(left, copy1);
+
+		// validate reference equality
+		assertSame(right, copy1);
+		assertSame(copy0, copy2);
+
+		// validate reference equality of contained objects
+		assertSame(right.right(), copy1.right());
+		assertSame(copy0.left(), copy2.left());
+	}
+
+	@Test
+	public void testSerializeIndividually() throws IOException {
+		EitherTypeInfo<LongValue, DoubleValue> eitherTypeInfo = new EitherTypeInfo<>(
+			ValueTypeInfo.LONG_VALUE_TYPE_INFO, ValueTypeInfo.DOUBLE_VALUE_TYPE_INFO);
+		EitherSerializer<LongValue, DoubleValue> eitherSerializer =
+			(EitherSerializer<LongValue, DoubleValue>) eitherTypeInfo.createSerializer(new ExecutionConfig());
+
+		LongValue lv = new LongValue();
+		DoubleValue dv = new DoubleValue();
+
+		Either<LongValue, DoubleValue> left = Left(lv);
+		Either<LongValue, DoubleValue> right = Right(dv);
+
+		TestOutputView out = new TestOutputView();
+		eitherSerializer.serialize(left, out);
+		eitherSerializer.serialize(right, out);
+		eitherSerializer.serialize(left, out);
+
+		TestInputView in = out.getInputView();
+		// the first deserialization creates a new instance of Left
+		Either<LongValue, DoubleValue> copy0 = eitherSerializer.deserialize(right, in);
+
+		// then the cross-references are used for future copies
+		Either<LongValue, DoubleValue> copy1 = eitherSerializer.deserialize(copy0, in);
+		Either<LongValue, DoubleValue> copy2 = eitherSerializer.deserialize(copy1, in);
+
+		// validate reference equality
+		assertSame(right, copy1);
+		assertSame(copy0, copy2);
+
+		// validate reference equality of contained objects
+		assertSame(right.right(), copy1.right());
+		assertSame(copy0.left(), copy2.left());
+	}
+
 	/**
 	 * {@link org.apache.flink.api.common.typeutils.SerializerTestBase#testInstantiate()}
 	 * checks that the type of the created instance is the same as the type class parameter.
@@ -98,7 +211,7 @@ public class EitherSerializerTest {
 
 				T instance = serializer.createInstance();
 				assertNotNull("The created instance must not be null.", instance);
-				
+
 				Class<T> type = getTypeClass();
 				assertNotNull("The test is corrupt: type class is null.", type);
 			}
@@ -108,6 +221,6 @@ public class EitherSerializerTest {
 				fail("Exception in test: " + e.getMessage());
 			}
 		}
-		
+
 	}
 }