You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/04/06 16:56:27 UTC
flink git commit: [FLINK-6268] [core] Object reuse for Either type
Repository: flink
Updated Branches:
refs/heads/master c5173fa26 -> ae17718fb
[FLINK-6268] [core] Object reuse for Either type
Implement object reuse for Flink's Either type by storing a reference to
Right in Left and Left in Right. These references are private and remain
null until set by EitherSerializer when copying or deserializing with
object reuse.
This closes #3680
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ae17718f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ae17718f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ae17718f
Branch: refs/heads/master
Commit: ae17718fb9262f65b89b4551d2aecaab3ee23d9e
Parents: c5173fa
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Apr 5 10:12:04 2017 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Thu Apr 6 12:55:49 2017 -0400
----------------------------------------------------------------------
.../typeutils/runtime/EitherSerializer.java | 51 +++-----
.../java/org/apache/flink/types/Either.java | 87 ++++++++++++-
.../typeutils/runtime/EitherSerializerTest.java | 129 +++++++++++++++++--
3 files changed, 227 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ae17718f/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
index 4066e9a..d9018da 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
@@ -51,7 +51,7 @@ public class EitherSerializer<L, R> extends TypeSerializer<Either<L, R>> {
@Override
public boolean isImmutableType() {
- return leftSerializer.isImmutableType() && rightSerializer.isImmutableType();
+ return false;
}
@Override
@@ -91,23 +91,16 @@ public class EitherSerializer<L, R> extends TypeSerializer<Either<L, R>> {
@Override
public Either<L, R> copy(Either<L, R> from, Either<L, R> reuse) {
- if (from.isRight()) {
- final R right = from.right();
- if (reuse.isRight()) {
- R copyRight = rightSerializer.copy(right, reuse.right());
- return Right(copyRight);
- }
- else {
- // if the reuse record isn't a right value, we cannot reuse
- R copyRight = rightSerializer.copy(right);
- return Right(copyRight);
- }
- }
- else {
- L left = from.left();
- // reuse record is never a left value because we always create a right instance
- L copyLeft = leftSerializer.copy(left);
- return Left(copyLeft);
+ if (from.isLeft()) {
+ Left<L, R> to = Either.obtainLeft(reuse, leftSerializer);
+ L left = leftSerializer.copy(from.left(), to.left());
+ to.setValue(left);
+ return to;
+ } else {
+ Right<L, R> to = Either.obtainRight(reuse, rightSerializer);
+ R right = rightSerializer.copy(from.right(), to.right());
+ to.setValue(right);
+ return to;
}
}
@@ -142,18 +135,16 @@ public class EitherSerializer<L, R> extends TypeSerializer<Either<L, R>> {
@Override
public Either<L, R> deserialize(Either<L, R> reuse, DataInputView source) throws IOException {
boolean isLeft = source.readBoolean();
- if (!isLeft) {
- if (reuse.isRight()) {
- return Right(rightSerializer.deserialize(reuse.right(), source));
- }
- else {
- // if the reuse record isn't a right value, we cannot reuse
- return Right(rightSerializer.deserialize(source));
- }
- }
- else {
- // reuse record is never a left value because we always create a right instance
- return Left(leftSerializer.deserialize(source));
+ if (isLeft) {
+ Left<L, R> to = Either.obtainLeft(reuse, leftSerializer);
+ L left = leftSerializer.deserialize(to.left(), source);
+ to.setValue(left);
+ return to;
+ } else {
+ Right<L, R> to = Either.obtainRight(reuse, rightSerializer);
+ R right = rightSerializer.deserialize(to.right(), source);
+ to.setValue(right);
+ return to;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ae17718f/flink-core/src/main/java/org/apache/flink/types/Either.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/Either.java b/flink-core/src/main/java/org/apache/flink/types/Either.java
index a08e968..80f62f6 100644
--- a/flink-core/src/main/java/org/apache/flink/types/Either.java
+++ b/flink-core/src/main/java/org/apache/flink/types/Either.java
@@ -18,9 +18,12 @@
package org.apache.flink.types;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.typeinfo.TypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.EitherTypeInfoFactory;
+import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
/**
* This type represents a value of one two possible types, Left or Right (a
@@ -92,7 +95,9 @@ public abstract class Either<L, R> {
* the type of Right
*/
public static class Left<L, R> extends Either<L, R> {
- private final L value;
+ private L value;
+
+ private Right<L, R> right;
public Left(L value) {
this.value = java.util.Objects.requireNonNull(value);
@@ -108,6 +113,15 @@ public abstract class Either<L, R> {
throw new IllegalStateException("Cannot retrieve Right value on a Left");
}
+ /**
+ * Sets the encapsulated value to another value
+ *
+ * @param value the new value of the encapsulated value
+ */
+ public void setValue(L value) {
+ this.value = value;
+ }
+
@Override
public boolean equals(Object object) {
if (object instanceof Left<?, ?>) {
@@ -145,7 +159,9 @@ public abstract class Either<L, R> {
* the type of Right
*/
public static class Right<L, R> extends Either<L, R> {
- private final R value;
+ private R value;
+
+ private Left<L, R> left;
public Right(R value) {
this.value = java.util.Objects.requireNonNull(value);
@@ -161,6 +177,15 @@ public abstract class Either<L, R> {
return value;
}
+ /**
+ * Sets the encapsulated value to another value
+ *
+ * @param value the new value of the encapsulated value
+ */
+ public void setValue(R value) {
+ this.value = value;
+ }
+
@Override
public boolean equals(Object object) {
if (object instanceof Right<?, ?>) {
@@ -188,4 +213,62 @@ public abstract class Either<L, R> {
return new Right<L, R>(right);
}
}
+
+ /**
+ * Utility function for {@link EitherSerializer} to support object reuse.
+ *
+ * To support object reuse both subclasses of Either contain a reference to
+ * an instance of the other type. This method provides access to and
+ * initializes the cross-reference.
+ *
+ * @param input container for Left or Right value
+ * @param leftSerializer for creating an instance of the left type
+ * @param <L>
+ * the type of Left
+ * @param <R>
+ * the type of Right
+ * @return input if Left type else input's Left reference
+ */
+ @Internal
+ public static <L, R> Left<L, R> obtainLeft(Either<L, R> input, TypeSerializer<L> leftSerializer) {
+ if (input.isLeft()) {
+ return (Left<L, R>) input;
+ } else {
+ Right<L, R> right = (Right<L, R>) input;
+ if (right.left == null) {
+ right.left = Left.of(leftSerializer.createInstance());
+ right.left.right = right;
+ }
+ return right.left;
+ }
+ }
+
+ /**
+ * Utility function for {@link EitherSerializer} to support object reuse.
+ *
+ * To support object reuse both subclasses of Either contain a reference to
+ * an instance of the other type. This method provides access to and
+ * initializes the cross-reference.
+ *
+ * @param input container for Left or Right value
+ * @param rightSerializer for creating an instance of the right type
+ * @param <L>
+ * the type of Left
+ * @param <R>
+ * the type of Right
+ * @return input if Right type else input's Right reference
+ */
+ @Internal
+ public static <L, R> Right<L, R> obtainRight(Either<L, R> input, TypeSerializer<R> rightSerializer) {
+ if (input.isRight()) {
+ return (Right<L, R>) input;
+ } else {
+ Left<L, R> left = (Left<L, R>) input;
+ if (left.right == null) {
+ left.right = Right.of(rightSerializer.createInstance());
+ left.right.left = left;
+ }
+ return left.right;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ae17718f/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java
index acf0d2e..2bf6abc 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java
@@ -18,21 +18,30 @@
package org.apache.flink.api.java.typeutils.runtime;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-import static org.apache.flink.types.Either.Left;
-import static org.apache.flink.types.Either.Right;
-
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.ComparatorTestBase.TestInputView;
+import org.apache.flink.api.common.typeutils.ComparatorTestBase.TestOutputView;
import org.apache.flink.api.common.typeutils.SerializerTestInstance;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.types.Either;
import org.apache.flink.api.java.typeutils.EitherTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.Either;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
import org.junit.Test;
+import java.io.IOException;
+
+import static junit.framework.TestCase.assertSame;
+import static org.apache.flink.types.Either.Left;
+import static org.apache.flink.types.Either.Right;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
public class EitherSerializerTest {
@SuppressWarnings("unchecked")
@@ -55,6 +64,26 @@ public class EitherSerializerTest {
testInstance.testAll();
}
+ @Test
+ public void testStringValueDoubleValueEither() {
+ @SuppressWarnings("unchecked")
+ Either<StringValue, DoubleValue>[] testData = new Either[] {
+ Left(new StringValue("banana")),
+ Left.of(new StringValue("apple")),
+ new Left(new StringValue("")),
+ Right(new DoubleValue(32.0)),
+ Right.of(new DoubleValue(Double.MIN_VALUE)),
+ new Right(new DoubleValue(Double.MAX_VALUE))};
+
+ EitherTypeInfo<StringValue, DoubleValue> eitherTypeInfo = new EitherTypeInfo<>(
+ ValueTypeInfo.STRING_VALUE_TYPE_INFO, ValueTypeInfo.DOUBLE_VALUE_TYPE_INFO);
+ EitherSerializer<StringValue, DoubleValue> eitherSerializer =
+ (EitherSerializer<StringValue, DoubleValue>) eitherTypeInfo.createSerializer(new ExecutionConfig());
+ SerializerTestInstance<Either<StringValue, DoubleValue>> testInstance =
+ new EitherSerializerTestInstance<>(eitherSerializer, eitherTypeInfo.getTypeClass(), -1, testData);
+ testInstance.testAll();
+ }
+
@SuppressWarnings("unchecked")
@Test
public void testEitherWithTuple() {
@@ -78,6 +107,90 @@ public class EitherSerializerTest {
testInstance.testAll();
}
+ @Test
+ public void testEitherWithTupleValues() {
+ @SuppressWarnings("unchecked")
+ Either<Tuple2<LongValue, LongValue>, DoubleValue>[] testData = new Either[] {
+ Left(new Tuple2<>(new LongValue(2L), new LongValue(9L))),
+ new Left<>(new Tuple2<>(new LongValue(Long.MIN_VALUE), new LongValue(Long.MAX_VALUE))),
+ new Right<>(new DoubleValue(32.0)),
+ Right(new DoubleValue(Double.MIN_VALUE)),
+ Right(new DoubleValue(Double.MAX_VALUE))};
+
+ EitherTypeInfo<Tuple2<LongValue, LongValue>, DoubleValue> eitherTypeInfo = new EitherTypeInfo<>(
+ new TupleTypeInfo<Tuple2<LongValue, LongValue>>(ValueTypeInfo.LONG_VALUE_TYPE_INFO, ValueTypeInfo.LONG_VALUE_TYPE_INFO),
+ ValueTypeInfo.DOUBLE_VALUE_TYPE_INFO);
+ EitherSerializer<Tuple2<LongValue, LongValue>, DoubleValue> eitherSerializer =
+ (EitherSerializer<Tuple2<LongValue, LongValue>, DoubleValue>) eitherTypeInfo.createSerializer(new ExecutionConfig());
+ SerializerTestInstance<Either<Tuple2<LongValue, LongValue>, DoubleValue>> testInstance =
+ new EitherSerializerTestInstance<>(eitherSerializer, eitherTypeInfo.getTypeClass(), -1, testData);
+ testInstance.testAll();
+ }
+
+ @Test
+ public void testEitherWithObjectReuse() {
+ EitherTypeInfo<LongValue, DoubleValue> eitherTypeInfo = new EitherTypeInfo<>(
+ ValueTypeInfo.LONG_VALUE_TYPE_INFO, ValueTypeInfo.DOUBLE_VALUE_TYPE_INFO);
+ EitherSerializer<LongValue, DoubleValue> eitherSerializer =
+ (EitherSerializer<LongValue, DoubleValue>) eitherTypeInfo.createSerializer(new ExecutionConfig());
+
+ LongValue lv = new LongValue();
+ DoubleValue dv = new DoubleValue();
+
+ Either<LongValue, DoubleValue> left = Left(lv);
+ Either<LongValue, DoubleValue> right = Right(dv);
+
+ // the first copy creates a new instance of Left
+ Either<LongValue, DoubleValue> copy0 = eitherSerializer.copy(left, right);
+
+ // then the cross-references are used for future copies
+ Either<LongValue, DoubleValue> copy1 = eitherSerializer.copy(right, copy0);
+ Either<LongValue, DoubleValue> copy2 = eitherSerializer.copy(left, copy1);
+
+ // validate reference equality
+ assertSame(right, copy1);
+ assertSame(copy0, copy2);
+
+ // validate reference equality of contained objects
+ assertSame(right.right(), copy1.right());
+ assertSame(copy0.left(), copy2.left());
+ }
+
+ @Test
+ public void testSerializeIndividually() throws IOException {
+ EitherTypeInfo<LongValue, DoubleValue> eitherTypeInfo = new EitherTypeInfo<>(
+ ValueTypeInfo.LONG_VALUE_TYPE_INFO, ValueTypeInfo.DOUBLE_VALUE_TYPE_INFO);
+ EitherSerializer<LongValue, DoubleValue> eitherSerializer =
+ (EitherSerializer<LongValue, DoubleValue>) eitherTypeInfo.createSerializer(new ExecutionConfig());
+
+ LongValue lv = new LongValue();
+ DoubleValue dv = new DoubleValue();
+
+ Either<LongValue, DoubleValue> left = Left(lv);
+ Either<LongValue, DoubleValue> right = Right(dv);
+
+ TestOutputView out = new TestOutputView();
+ eitherSerializer.serialize(left, out);
+ eitherSerializer.serialize(right, out);
+ eitherSerializer.serialize(left, out);
+
+ TestInputView in = out.getInputView();
+ // the first deserialization creates a new instance of Left
+ Either<LongValue, DoubleValue> copy0 = eitherSerializer.deserialize(right, in);
+
+ // then the cross-references are used for future copies
+ Either<LongValue, DoubleValue> copy1 = eitherSerializer.deserialize(copy0, in);
+ Either<LongValue, DoubleValue> copy2 = eitherSerializer.deserialize(copy1, in);
+
+ // validate reference equality
+ assertSame(right, copy1);
+ assertSame(copy0, copy2);
+
+ // validate reference equality of contained objects
+ assertSame(right.right(), copy1.right());
+ assertSame(copy0.left(), copy2.left());
+ }
+
/**
* {@link org.apache.flink.api.common.typeutils.SerializerTestBase#testInstantiate()}
* checks that the type of the created instance is the same as the type class parameter.
@@ -98,7 +211,7 @@ public class EitherSerializerTest {
T instance = serializer.createInstance();
assertNotNull("The created instance must not be null.", instance);
-
+
Class<T> type = getTypeClass();
assertNotNull("The test is corrupt: type class is null.", type);
}
@@ -108,6 +221,6 @@ public class EitherSerializerTest {
fail("Exception in test: " + e.getMessage());
}
}
-
+
}
}