You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2014/11/19 12:20:12 UTC
incubator-flink git commit: Add byte array serialization to
InstantiationUtil
Repository: incubator-flink
Updated Branches:
refs/heads/master f84e4ecdf -> da6063045
Add byte array serialization to InstantiationUtil
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/da606304
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/da606304
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/da606304
Branch: refs/heads/master
Commit: da60630455c55cd9b238313999f3ad3e2ac0009d
Parents: f84e4ec
Author: uce <u....@fu-berlin.de>
Authored: Thu Jul 3 16:35:48 2014 +0200
Committer: uce <uc...@apache.org>
Committed: Wed Nov 19 12:05:39 2014 +0100
----------------------------------------------------------------------
.../apache/flink/util/InstantiationUtil.java | 33 +++++++-
.../flink/util/InstantiationUtilTest.java | 84 ++++++++++++++++++++
.../flink/util/InstantiationUtilsTest.java | 68 ----------------
.../api/java/io/CollectionInputFormat.java | 4 +-
4 files changed, 116 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/da606304/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index f0c2d57..c7088f5 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -18,8 +18,15 @@
package org.apache.flink.util;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
+import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
@@ -28,8 +35,6 @@ import java.io.ObjectStreamClass;
import java.lang.reflect.Constructor;
import java.lang.reflect.Modifier;
-import org.apache.flink.configuration.Configuration;
-
/**
* Utility class to create instances from class objects and checking failure reasons.
*/
@@ -235,6 +240,30 @@ public class InstantiationUtil {
byte[] bytes = serializeObject(o);
config.setBytes(key, bytes);
}
+
+ public static <T> byte[] serializeToByteArray(TypeSerializer<T> serializer, T record) throws IOException {
+ if (record == null) {
+ throw new NullPointerException("Record to serialize to byte array must not be null.");
+ }
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(64);
+ OutputViewDataOutputStreamWrapper outputViewWrapper = new OutputViewDataOutputStreamWrapper(new DataOutputStream(bos));
+
+ serializer.serialize(record, outputViewWrapper);
+
+ return bos.toByteArray();
+ }
+
+ public static <T> T deserializeFromByteArray(TypeSerializer<T> serializer, byte[] buf) throws IOException {
+ if (buf == null) {
+ throw new NullPointerException("Byte array to deserialize from must not be null.");
+ }
+
+ InputViewDataInputStreamWrapper inputViewWrapper = new InputViewDataInputStreamWrapper(new DataInputStream(new ByteArrayInputStream(buf)));
+
+ T record = serializer.createInstance();
+ return serializer.deserialize(record, inputViewWrapper);
+ }
public static Object deserializeObject(byte[] bytes, ClassLoader cl) throws IOException, ClassNotFoundException {
ObjectInputStream oois = null;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/da606304/flink-core/src/test/java/org/apache/flink/util/InstantiationUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/InstantiationUtilTest.java b/flink-core/src/test/java/org/apache/flink/util/InstantiationUtilTest.java
new file mode 100644
index 0000000..bf4fc8c
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/InstantiationUtilTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.api.common.typeutils.base.DoubleValueSerializer;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.types.Value;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class InstantiationUtilTest {
+
+ @Test
+ public void testInstantiationOfStringValue() {
+ StringValue stringValue = InstantiationUtil.instantiate(
+ StringValue.class, null);
+ assertNotNull(stringValue);
+ }
+
+ @Test
+ public void testInstantiationOfStringValueAndCastToValue() {
+ StringValue stringValue = InstantiationUtil.instantiate(
+ StringValue.class, Value.class);
+ assertNotNull(stringValue);
+ }
+
+ @Test
+ public void testHasNullaryConstructor() {
+ assertTrue(InstantiationUtil
+ .hasPublicNullaryConstructor(StringValue.class));
+ }
+
+ @Test
+ public void testClassIsProper() {
+ assertTrue(InstantiationUtil.isProperClass(StringValue.class));
+ }
+
+ @Test
+ public void testClassIsNotProper() {
+ assertFalse(InstantiationUtil.isProperClass(Value.class));
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testCheckForInstantiationOfPrivateClass() {
+ InstantiationUtil.checkForInstantiation(TestClass.class);
+ }
+
+ @Test
+ public void testSerializationToByteArray() throws IOException {
+ final DoubleValue toSerialize = new DoubleValue(Math.random());
+ final DoubleValueSerializer serializer = new DoubleValueSerializer();
+
+ byte[] serialized = InstantiationUtil.serializeToByteArray(serializer, toSerialize);
+
+ DoubleValue deserialized = InstantiationUtil.deserializeFromByteArray(serializer, serialized);
+
+ assertEquals("Serialized record is not equal after serialization.", toSerialize, deserialized);
+ }
+
+ private class TestClass {}
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/da606304/flink-core/src/test/java/org/apache/flink/util/InstantiationUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/InstantiationUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/InstantiationUtilsTest.java
deleted file mode 100644
index 1556bf2..0000000
--- a/flink-core/src/test/java/org/apache/flink/util/InstantiationUtilsTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.util;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.flink.types.StringValue;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.InstantiationUtil;
-import org.junit.Test;
-
-public class InstantiationUtilsTest {
-
- @Test
- public void testInstatiationOfStringValue() {
- StringValue stringValue = InstantiationUtil.instantiate(
- StringValue.class, null);
- assertNotNull(stringValue);
- }
-
- @Test
- public void testInstatiationOfStringValueAndCastToValue() {
- StringValue stringValue = InstantiationUtil.instantiate(
- StringValue.class, Value.class);
- assertNotNull(stringValue);
- }
-
- @Test
- public void testHasNullaryConstructor() {
- assertTrue(InstantiationUtil
- .hasPublicNullaryConstructor(StringValue.class));
- }
-
- @Test
- public void testClassIsProper() {
- assertTrue(InstantiationUtil.isProperClass(StringValue.class));
- }
-
- @Test
- public void testClassIsNotProper() {
- assertFalse(InstantiationUtil.isProperClass(Value.class));
- }
-
- @Test(expected = RuntimeException.class)
- public void testCheckForInstantiationOfPrivateClass() {
- InstantiationUtil.checkForInstantiation(TestClass.class);
- }
-
- private class TestClass {}
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/da606304/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
index 77ba666..89adf96 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
@@ -44,10 +44,9 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
private TypeSerializer<T> serializer;
private transient Collection<T> dataSet; // input data as collection. transient, because it will be serialized in a custom way
-
+
private transient Iterator<T> iterator;
-
public CollectionInputFormat(Collection<T> dataSet, TypeSerializer<T> serializer) {
if (dataSet == null) {
throw new NullPointerException();
@@ -58,7 +57,6 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
this.dataSet = dataSet;
}
-
@Override
public boolean reachedEnd() throws IOException {
return !this.iterator.hasNext();