You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/29 14:08:36 UTC
[5/8] flink git commit: [FLINK-2723] [core] CopyableValue method to
copy into new instance
[FLINK-2723] [core] CopyableValue method to copy into new instance
This closes #1169
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e727355e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e727355e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e727355e
Branch: refs/heads/master
Commit: e727355e42bd0ad7d403aee703aaf33a68a839d2
Parents: 40cbf7e
Author: Greg Hogan <co...@greghogan.com>
Authored: Mon Sep 21 15:14:09 2015 -0400
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 29 12:18:07 2015 +0200
----------------------------------------------------------------------
.../org/apache/flink/types/BooleanValue.java | 6 +-
.../java/org/apache/flink/types/ByteValue.java | 7 +-
.../java/org/apache/flink/types/CharValue.java | 7 +-
.../org/apache/flink/types/CopyableValue.java | 30 ++++-
.../org/apache/flink/types/DoubleValue.java | 7 +-
.../java/org/apache/flink/types/FloatValue.java | 7 +-
.../java/org/apache/flink/types/IntValue.java | 9 +-
.../java/org/apache/flink/types/LongValue.java | 7 +-
.../java/org/apache/flink/types/NullValue.java | 7 +-
.../java/org/apache/flink/types/Record.java | 5 +
.../java/org/apache/flink/types/ShortValue.java | 7 +-
.../org/apache/flink/types/StringValue.java | 7 +-
.../apache/flink/types/CopyableValueTest.java | 109 +++++++++++++++++++
13 files changed, 203 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java b/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java
index 071e650..e034648 100644
--- a/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java
@@ -48,7 +48,6 @@ public class BooleanValue implements NormalizableKey<BooleanValue>, ResettableVa
this.value = value;
}
-
public boolean get() {
return value;
}
@@ -118,6 +117,11 @@ public class BooleanValue implements NormalizableKey<BooleanValue>, ResettableVa
}
@Override
+ public BooleanValue copy() {
+ return new BooleanValue(this.value);
+ }
+
+ @Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
target.write(source, 1);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/ByteValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/ByteValue.java b/flink-core/src/main/java/org/apache/flink/types/ByteValue.java
index c2f1f10..40ed1ad 100644
--- a/flink-core/src/main/java/org/apache/flink/types/ByteValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/ByteValue.java
@@ -147,13 +147,18 @@ public class ByteValue implements NormalizableKey<ByteValue>, ResettableValue<By
public int getBinaryLength() {
return 1;
}
-
+
@Override
public void copyTo(ByteValue target) {
target.value = this.value;
}
@Override
+ public ByteValue copy() {
+ return new ByteValue(this.value);
+ }
+
+ @Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
target.write(source, 1);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/CharValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/CharValue.java b/flink-core/src/main/java/org/apache/flink/types/CharValue.java
index 3fd9f29..06b67c7 100644
--- a/flink-core/src/main/java/org/apache/flink/types/CharValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/CharValue.java
@@ -151,13 +151,18 @@ public class CharValue implements NormalizableKey<CharValue>, ResettableValue<Ch
public int getBinaryLength() {
return 2;
}
-
+
@Override
public void copyTo(CharValue target) {
target.value = this.value;
}
@Override
+ public CharValue copy() {
+ return new CharValue(this.value);
+ }
+
+ @Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
target.write(source, 2);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/CopyableValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/CopyableValue.java b/flink-core/src/main/java/org/apache/flink/types/CopyableValue.java
index 57e0c46..3974cb2 100644
--- a/flink-core/src/main/java/org/apache/flink/types/CopyableValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/CopyableValue.java
@@ -35,7 +35,35 @@ public interface CopyableValue<T> extends Value {
*/
int getBinaryLength();
+ /**
+ * Performs a deep copy of this object into the {@code target} instance.
+ *
+ * @param target Object to copy into.
+ */
void copyTo(T target);
-
+
+ /**
+ * Performs a deep copy of this object into a new instance.
+ *
+ * This method is useful for generic user-defined functions to clone a
+ * {@link CopyableValue} when storing multiple objects. With object reuse
+ * a deep copy must be created and type erasure prevents calling new.
+ *
+ * @return New object with copied fields.
+ */
+ T copy();
+
+ /**
+ * Copies the next serialized instance from {@code source} to {@code target}.
+ *
+ * This method is equivalent to calling {@code IOReadableWritable.read(DataInputView)}
+ * followed by {@code IOReadableWritable.write(DataOutputView)} but does not require
+ * intermediate deserialization.
+ *
+ * @param source Data source for serialized instance.
+ * @param target Data target for serialized instance.
+ *
+ * @see org.apache.flink.core.io.IOReadableWritable
+ */
void copy(DataInputView source, DataOutputView target) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java b/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java
index abcbcf5..3158e40 100644
--- a/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java
@@ -122,13 +122,18 @@ public class DoubleValue implements Key<DoubleValue>, ResettableValue<DoubleValu
public int getBinaryLength() {
return 8;
}
-
+
@Override
public void copyTo(DoubleValue target) {
target.value = this.value;
}
@Override
+ public DoubleValue copy() {
+ return new DoubleValue(this.value);
+ }
+
+ @Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
target.write(source, 8);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/FloatValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/FloatValue.java b/flink-core/src/main/java/org/apache/flink/types/FloatValue.java
index d57c74b..5364203 100644
--- a/flink-core/src/main/java/org/apache/flink/types/FloatValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/FloatValue.java
@@ -121,13 +121,18 @@ public class FloatValue implements Key<FloatValue>, ResettableValue<FloatValue>,
public int getBinaryLength() {
return 4;
}
-
+
@Override
public void copyTo(FloatValue target) {
target.value = this.value;
}
@Override
+ public FloatValue copy() {
+ return new FloatValue(this.value);
+ }
+
+ @Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
target.write(source, 4);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/IntValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/IntValue.java b/flink-core/src/main/java/org/apache/flink/types/IntValue.java
index 423c8c1..1b893f0 100644
--- a/flink-core/src/main/java/org/apache/flink/types/IntValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/IntValue.java
@@ -139,7 +139,7 @@ public class IntValue implements NormalizableKey<IntValue>, ResettableValue<IntV
}
}
else {
- target.putIntBigEndian(offset, value - Integer.MIN_VALUE);
+ target.putIntBigEndian(offset, value - Integer.MIN_VALUE);
for (int i = 4; i < len; i++) {
target.put(offset + i, (byte) 0);
}
@@ -152,13 +152,18 @@ public class IntValue implements NormalizableKey<IntValue>, ResettableValue<IntV
public int getBinaryLength() {
return 4;
}
-
+
@Override
public void copyTo(IntValue target) {
target.value = this.value;
}
@Override
+ public IntValue copy() {
+ return new IntValue(this.value);
+ }
+
+ @Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
target.write(source, 4);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/LongValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/LongValue.java b/flink-core/src/main/java/org/apache/flink/types/LongValue.java
index e8fcd53..2b6cb1f 100644
--- a/flink-core/src/main/java/org/apache/flink/types/LongValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/LongValue.java
@@ -160,13 +160,18 @@ public class LongValue implements NormalizableKey<LongValue>, ResettableValue<Lo
public int getBinaryLength() {
return 8;
}
-
+
@Override
public void copyTo(LongValue target) {
target.value = this.value;
}
@Override
+ public LongValue copy() {
+ return new LongValue(this.value);
+ }
+
+ @Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
target.write(source, 8);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/NullValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/NullValue.java b/flink-core/src/main/java/org/apache/flink/types/NullValue.java
index 5391b7b..aa56536 100644
--- a/flink-core/src/main/java/org/apache/flink/types/NullValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/NullValue.java
@@ -110,12 +110,17 @@ public final class NullValue implements NormalizableKey<NullValue>, CopyableValu
public int getBinaryLength() {
return 1;
}
-
+
@Override
public void copyTo(NullValue target) {
}
@Override
+ public NullValue copy() {
+ return NullValue.getInstance();
+ }
+
+ @Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
source.readBoolean();
target.writeBoolean(false);
http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/Record.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/Record.java b/flink-core/src/main/java/org/apache/flink/types/Record.java
index 218d6ce..24ff979 100644
--- a/flink-core/src/main/java/org/apache/flink/types/Record.java
+++ b/flink-core/src/main/java/org/apache/flink/types/Record.java
@@ -763,6 +763,11 @@ public final class Record implements Value, CopyableValue<Record> {
}
@Override
+ public Record copy() {
+ return createCopy();
+ }
+
+ @Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
int val = source.readUnsignedByte();
target.writeByte(val);
http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/ShortValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/ShortValue.java b/flink-core/src/main/java/org/apache/flink/types/ShortValue.java
index ec03497..f18ce7f 100644
--- a/flink-core/src/main/java/org/apache/flink/types/ShortValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/ShortValue.java
@@ -154,13 +154,18 @@ public class ShortValue implements NormalizableKey<ShortValue>, ResettableValue<
public int getBinaryLength() {
return 2;
}
-
+
@Override
public void copyTo(ShortValue target) {
target.value = this.value;
}
@Override
+ public ShortValue copy() {
+ return new ShortValue(this.value);
+ }
+
+ @Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
target.write(source, 2);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/StringValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/StringValue.java b/flink-core/src/main/java/org/apache/flink/types/StringValue.java
index db0f184..2249019 100644
--- a/flink-core/src/main/java/org/apache/flink/types/StringValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/StringValue.java
@@ -682,7 +682,12 @@ public class StringValue implements NormalizableKey<StringValue>, CharSequence,
target.ensureSize(this.len);
System.arraycopy(this.value, 0, target.value, 0, this.len);
}
-
+
+ @Override
+ public StringValue copy() {
+ return new StringValue(this);
+ }
+
@Override
public void copy(DataInputView in, DataOutputView target) throws IOException {
int len = in.readUnsignedByte();
http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/test/java/org/apache/flink/types/CopyableValueTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/CopyableValueTest.java b/flink-core/src/test/java/org/apache/flink/types/CopyableValueTest.java
new file mode 100644
index 0000000..76bdece
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/types/CopyableValueTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class CopyableValueTest {
+
+ @Test
+ public void testCopy() {
+ CopyableValue<?>[] value_types = new CopyableValue[] {
+ new BooleanValue(true),
+ new ByteValue((byte) 42),
+ new CharValue('q'),
+ new DoubleValue(3.1415926535897932),
+ new FloatValue((float) 3.14159265),
+ new IntValue(42),
+ new LongValue(42l),
+ new NullValue(),
+ new ShortValue((short) 42),
+ new StringValue("QED")
+ };
+
+ for (CopyableValue<?> type : value_types) {
+ assertEquals(type, type.copy());
+ }
+ }
+
+ @Test
+ public void testCopyTo() {
+ BooleanValue boolean_from = new BooleanValue(true);
+ BooleanValue boolean_to = new BooleanValue(false);
+
+ boolean_from.copyTo(boolean_to);
+ assertEquals(boolean_from, boolean_to);
+
+ ByteValue byte_from = new ByteValue((byte) 3);
+ ByteValue byte_to = new ByteValue((byte) 7);
+
+ byte_from.copyTo(byte_to);
+ assertEquals(byte_from, byte_to);
+
+ CharValue char_from = new CharValue('α');
+ CharValue char_to = new CharValue('ω');
+
+ char_from.copyTo(char_to);
+ assertEquals(char_from, char_to);
+
+ DoubleValue double_from = new DoubleValue(2.7182818284590451);
+ DoubleValue double_to = new DoubleValue(0);
+
+ double_from.copyTo(double_to);
+ assertEquals(double_from, double_to);
+
+ FloatValue float_from = new FloatValue((float) 2.71828182);
+ FloatValue float_to = new FloatValue((float) 1.41421356);
+
+ float_from.copyTo(float_to);
+ assertEquals(float_from, float_to);
+
+ IntValue int_from = new IntValue(8191);
+ IntValue int_to = new IntValue(131071);
+
+ int_from.copyTo(int_to);
+ assertEquals(int_from, int_to);
+
+ LongValue long_from = new LongValue(524287);
+ LongValue long_to = new LongValue(2147483647);
+
+ long_from.copyTo(long_to);
+ assertEquals(long_from, long_to);
+
+ NullValue null_from = new NullValue();
+ NullValue null_to = new NullValue();
+
+ null_from.copyTo(null_to);
+ assertEquals(null_from, null_to);
+
+ ShortValue short_from = new ShortValue((short) 31);
+ ShortValue short_to = new ShortValue((short) 127);
+
+ short_from.copyTo(short_to);
+ assertEquals(short_from, short_to);
+
+ StringValue string_from = new StringValue("2305843009213693951");
+ StringValue string_to = new StringValue("618970019642690137449562111");
+
+ string_from.copyTo(string_to);
+ assertEquals(string_from, string_to);
+ }
+}