You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/29 14:08:36 UTC

[5/8] flink git commit: [FLINK-2723] [core] CopyableValue method to copy into new instance

[FLINK-2723] [core] CopyableValue method to copy into new instance

This closes #1169


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e727355e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e727355e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e727355e

Branch: refs/heads/master
Commit: e727355e42bd0ad7d403aee703aaf33a68a839d2
Parents: 40cbf7e
Author: Greg Hogan <co...@greghogan.com>
Authored: Mon Sep 21 15:14:09 2015 -0400
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 29 12:18:07 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/types/BooleanValue.java    |   6 +-
 .../java/org/apache/flink/types/ByteValue.java  |   7 +-
 .../java/org/apache/flink/types/CharValue.java  |   7 +-
 .../org/apache/flink/types/CopyableValue.java   |  30 ++++-
 .../org/apache/flink/types/DoubleValue.java     |   7 +-
 .../java/org/apache/flink/types/FloatValue.java |   7 +-
 .../java/org/apache/flink/types/IntValue.java   |   9 +-
 .../java/org/apache/flink/types/LongValue.java  |   7 +-
 .../java/org/apache/flink/types/NullValue.java  |   7 +-
 .../java/org/apache/flink/types/Record.java     |   5 +
 .../java/org/apache/flink/types/ShortValue.java |   7 +-
 .../org/apache/flink/types/StringValue.java     |   7 +-
 .../apache/flink/types/CopyableValueTest.java   | 109 +++++++++++++++++++
 13 files changed, 203 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java b/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java
index 071e650..e034648 100644
--- a/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java
@@ -48,7 +48,6 @@ public class BooleanValue implements NormalizableKey<BooleanValue>, ResettableVa
 		this.value = value;
 	}
 
-	
 	public boolean get() {
 		return value;
 	}
@@ -118,6 +117,11 @@ public class BooleanValue implements NormalizableKey<BooleanValue>, ResettableVa
 	}
 
 	@Override
+	public BooleanValue copy() {
+		return new BooleanValue(this.value);
+	}
+
+	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		target.write(source, 1);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/ByteValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/ByteValue.java b/flink-core/src/main/java/org/apache/flink/types/ByteValue.java
index c2f1f10..40ed1ad 100644
--- a/flink-core/src/main/java/org/apache/flink/types/ByteValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/ByteValue.java
@@ -147,13 +147,18 @@ public class ByteValue implements NormalizableKey<ByteValue>, ResettableValue<By
 	public int getBinaryLength() {
 		return 1;
 	}
-	
+
 	@Override
 	public void copyTo(ByteValue target) {
 		target.value = this.value;
 	}
 
 	@Override
+	public ByteValue copy() {
+		return new ByteValue(this.value);
+	}
+
+	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		target.write(source, 1);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/CharValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/CharValue.java b/flink-core/src/main/java/org/apache/flink/types/CharValue.java
index 3fd9f29..06b67c7 100644
--- a/flink-core/src/main/java/org/apache/flink/types/CharValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/CharValue.java
@@ -151,13 +151,18 @@ public class CharValue implements NormalizableKey<CharValue>, ResettableValue<Ch
 	public int getBinaryLength() {
 		return 2;
 	}
-	
+
 	@Override
 	public void copyTo(CharValue target) {
 		target.value = this.value;
 	}
 
 	@Override
+	public CharValue copy() {
+		return new CharValue(this.value);
+	}
+
+	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		target.write(source, 2);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/CopyableValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/CopyableValue.java b/flink-core/src/main/java/org/apache/flink/types/CopyableValue.java
index 57e0c46..3974cb2 100644
--- a/flink-core/src/main/java/org/apache/flink/types/CopyableValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/CopyableValue.java
@@ -35,7 +35,35 @@ public interface CopyableValue<T> extends Value {
 	 */
 	int getBinaryLength();
 	
+	/**
+	 * Performs a deep copy of this object into the {@code target} instance.
+	 *
+	 * @param target Object to copy into.
+	 */
 	void copyTo(T target);
-	
+
+	/**
+	 * Performs a deep copy of this object into a new instance.
+         *
+         * This method is useful for generic user-defined functions to clone a
+         * {@link CopyableValue} when storing multiple objects. With object reuse
+         * a deep copy must be created and type erasure prevents calling new.
+	 *
+	 * @return New object with copied fields.
+	 */
+	T copy();
+
+	/**
+	 * Copies the next serialized instance from {@code source} to {@code target}.
+	 *
+	 * This method is equivalent to calling {@code IOReadableWritable.read(DataInputView)}
+	 * followed by {@code IOReadableWritable.write(DataOutputView)} but does not require
+	 * intermediate deserialization.
+	 *
+	 * @param source Data source for serialized instance.
+	 * @param target Data target for serialized instance.
+	 *
+	 * @see org.apache.flink.core.io.IOReadableWritable
+	 */
 	void copy(DataInputView source, DataOutputView target) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java b/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java
index abcbcf5..3158e40 100644
--- a/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java
@@ -122,13 +122,18 @@ public class DoubleValue implements Key<DoubleValue>, ResettableValue<DoubleValu
 	public int getBinaryLength() {
 		return 8;
 	}
-	
+
 	@Override
 	public void copyTo(DoubleValue target) {
 		target.value = this.value;
 	}
 
 	@Override
+	public DoubleValue copy() {
+		return new DoubleValue(this.value);
+	}
+
+	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		target.write(source, 8);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/FloatValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/FloatValue.java b/flink-core/src/main/java/org/apache/flink/types/FloatValue.java
index d57c74b..5364203 100644
--- a/flink-core/src/main/java/org/apache/flink/types/FloatValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/FloatValue.java
@@ -121,13 +121,18 @@ public class FloatValue implements Key<FloatValue>, ResettableValue<FloatValue>,
 	public int getBinaryLength() {
 		return 4;
 	}
-	
+
 	@Override
 	public void copyTo(FloatValue target) {
 		target.value = this.value;
 	}
 
 	@Override
+	public FloatValue copy() {
+		return new FloatValue(this.value);
+	}
+
+	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		target.write(source, 4);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/IntValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/IntValue.java b/flink-core/src/main/java/org/apache/flink/types/IntValue.java
index 423c8c1..1b893f0 100644
--- a/flink-core/src/main/java/org/apache/flink/types/IntValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/IntValue.java
@@ -139,7 +139,7 @@ public class IntValue implements NormalizableKey<IntValue>, ResettableValue<IntV
 			}
 		}
 		else {
-			target.putIntBigEndian(offset, value  - Integer.MIN_VALUE);
+			target.putIntBigEndian(offset, value - Integer.MIN_VALUE);
 			for (int i = 4; i < len; i++) {
 				target.put(offset + i, (byte) 0);
 			}
@@ -152,13 +152,18 @@ public class IntValue implements NormalizableKey<IntValue>, ResettableValue<IntV
 	public int getBinaryLength() {
 		return 4;
 	}
-	
+
 	@Override
 	public void copyTo(IntValue target) {
 		target.value = this.value;
 	}
 
 	@Override
+	public IntValue copy() {
+		return new IntValue(this.value);
+	}
+
+	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		target.write(source, 4);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/LongValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/LongValue.java b/flink-core/src/main/java/org/apache/flink/types/LongValue.java
index e8fcd53..2b6cb1f 100644
--- a/flink-core/src/main/java/org/apache/flink/types/LongValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/LongValue.java
@@ -160,13 +160,18 @@ public class LongValue implements NormalizableKey<LongValue>, ResettableValue<Lo
 	public int getBinaryLength() {
 		return 8;
 	}
-	
+
 	@Override
 	public void copyTo(LongValue target) {
 		target.value = this.value;
 	}
 
 	@Override
+	public LongValue copy() {
+		return new LongValue(this.value);
+	}
+
+	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		target.write(source, 8);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/NullValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/NullValue.java b/flink-core/src/main/java/org/apache/flink/types/NullValue.java
index 5391b7b..aa56536 100644
--- a/flink-core/src/main/java/org/apache/flink/types/NullValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/NullValue.java
@@ -110,12 +110,17 @@ public final class NullValue implements NormalizableKey<NullValue>, CopyableValu
 	public int getBinaryLength() {
 		return 1;
 	}
-	
+
 	@Override
 	public void copyTo(NullValue target) {
 	}
 
 	@Override
+	public NullValue copy() {
+		return NullValue.getInstance();
+	}
+
+	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		source.readBoolean();
 		target.writeBoolean(false);

http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/Record.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/Record.java b/flink-core/src/main/java/org/apache/flink/types/Record.java
index 218d6ce..24ff979 100644
--- a/flink-core/src/main/java/org/apache/flink/types/Record.java
+++ b/flink-core/src/main/java/org/apache/flink/types/Record.java
@@ -763,6 +763,11 @@ public final class Record implements Value, CopyableValue<Record> {
 	}
 
 	@Override
+	public Record copy() {
+		return createCopy();
+	}
+
+	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		int val = source.readUnsignedByte();
 		target.writeByte(val);

http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/ShortValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/ShortValue.java b/flink-core/src/main/java/org/apache/flink/types/ShortValue.java
index ec03497..f18ce7f 100644
--- a/flink-core/src/main/java/org/apache/flink/types/ShortValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/ShortValue.java
@@ -154,13 +154,18 @@ public class ShortValue implements NormalizableKey<ShortValue>, ResettableValue<
 	public int getBinaryLength() {
 		return 2;
 	}
-	
+
 	@Override
 	public void copyTo(ShortValue target) {
 		target.value = this.value;
 	}
 
 	@Override
+	public ShortValue copy() {
+		return new ShortValue(this.value);
+	}
+
+	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		target.write(source, 2);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/StringValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/StringValue.java b/flink-core/src/main/java/org/apache/flink/types/StringValue.java
index db0f184..2249019 100644
--- a/flink-core/src/main/java/org/apache/flink/types/StringValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/StringValue.java
@@ -682,7 +682,12 @@ public class StringValue implements NormalizableKey<StringValue>, CharSequence,
 		target.ensureSize(this.len);
 		System.arraycopy(this.value, 0, target.value, 0, this.len);
 	}
-	
+
+	@Override
+	public StringValue copy() {
+		return new StringValue(this);
+	}
+
 	@Override
 	public void copy(DataInputView in, DataOutputView target) throws IOException {
 		int len = in.readUnsignedByte();

http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/test/java/org/apache/flink/types/CopyableValueTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/CopyableValueTest.java b/flink-core/src/test/java/org/apache/flink/types/CopyableValueTest.java
new file mode 100644
index 0000000..76bdece
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/types/CopyableValueTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class CopyableValueTest {
+
+	@Test
+	public void testCopy() {
+		CopyableValue<?>[] value_types = new CopyableValue[] {
+			new BooleanValue(true),
+			new ByteValue((byte) 42),
+			new CharValue('q'),
+			new DoubleValue(3.1415926535897932),
+			new FloatValue((float) 3.14159265),
+			new IntValue(42),
+			new LongValue(42l),
+			new NullValue(),
+			new ShortValue((short) 42),
+			new StringValue("QED")
+		};
+
+		for (CopyableValue<?> type : value_types) {
+			assertEquals(type, type.copy());
+		}
+	}
+
+	@Test
+	public void testCopyTo() {
+		BooleanValue boolean_from = new BooleanValue(true);
+		BooleanValue boolean_to = new BooleanValue(false);
+
+		boolean_from.copyTo(boolean_to);
+		assertEquals(boolean_from, boolean_to);
+
+		ByteValue byte_from = new ByteValue((byte) 3);
+		ByteValue byte_to = new ByteValue((byte) 7);
+
+		byte_from.copyTo(byte_to);
+		assertEquals(byte_from, byte_to);
+
+		CharValue char_from = new CharValue('α');
+		CharValue char_to = new CharValue('ω');
+
+		char_from.copyTo(char_to);
+		assertEquals(char_from, char_to);
+
+		DoubleValue double_from = new DoubleValue(2.7182818284590451);
+		DoubleValue double_to = new DoubleValue(0);
+
+		double_from.copyTo(double_to);
+		assertEquals(double_from, double_to);
+
+		FloatValue float_from = new FloatValue((float) 2.71828182);
+		FloatValue float_to = new FloatValue((float) 1.41421356);
+
+		float_from.copyTo(float_to);
+		assertEquals(float_from, float_to);
+
+		IntValue int_from = new IntValue(8191);
+		IntValue int_to = new IntValue(131071);
+
+		int_from.copyTo(int_to);
+		assertEquals(int_from, int_to);
+
+		LongValue long_from = new LongValue(524287);
+		LongValue long_to = new LongValue(2147483647);
+
+		long_from.copyTo(long_to);
+		assertEquals(long_from, long_to);
+
+		NullValue null_from = new NullValue();
+		NullValue null_to = new NullValue();
+
+		null_from.copyTo(null_to);
+		assertEquals(null_from, null_to);
+
+		ShortValue short_from = new ShortValue((short) 31);
+		ShortValue short_to = new ShortValue((short) 127);
+
+		short_from.copyTo(short_to);
+		assertEquals(short_from, short_to);
+
+		StringValue string_from = new StringValue("2305843009213693951");
+		StringValue string_to = new StringValue("618970019642690137449562111");
+
+		string_from.copyTo(string_to);
+		assertEquals(string_from, string_to);
+	}
+}