You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2014/11/19 12:20:12 UTC

incubator-flink git commit: Add byte array serialization to InstantiationUtil

Repository: incubator-flink
Updated Branches:
  refs/heads/master f84e4ecdf -> da6063045


Add byte array serialization to InstantiationUtil


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/da606304
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/da606304
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/da606304

Branch: refs/heads/master
Commit: da60630455c55cd9b238313999f3ad3e2ac0009d
Parents: f84e4ec
Author: uce <u....@fu-berlin.de>
Authored: Thu Jul 3 16:35:48 2014 +0200
Committer: uce <uc...@apache.org>
Committed: Wed Nov 19 12:05:39 2014 +0100

----------------------------------------------------------------------
 .../apache/flink/util/InstantiationUtil.java    | 33 +++++++-
 .../flink/util/InstantiationUtilTest.java       | 84 ++++++++++++++++++++
 .../flink/util/InstantiationUtilsTest.java      | 68 ----------------
 .../api/java/io/CollectionInputFormat.java      |  4 +-
 4 files changed, 116 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/da606304/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index f0c2d57..c7088f5 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -18,8 +18,15 @@
 
 package org.apache.flink.util;
 
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
+import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInputStream;
@@ -28,8 +35,6 @@ import java.io.ObjectStreamClass;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Modifier;
 
-import org.apache.flink.configuration.Configuration;
-
 /**
  * Utility class to create instances from class objects and checking failure reasons.
  */
@@ -235,6 +240,30 @@ public class InstantiationUtil {
 		byte[] bytes = serializeObject(o);
 		config.setBytes(key, bytes);
 	}
+
+	public static <T> byte[] serializeToByteArray(TypeSerializer<T> serializer, T record) throws IOException {
+		if (record == null) {
+			throw new NullPointerException("Record to serialize to byte array must not be null.");
+		}
+
+		ByteArrayOutputStream bos = new ByteArrayOutputStream(64);
+		OutputViewDataOutputStreamWrapper outputViewWrapper = new OutputViewDataOutputStreamWrapper(new DataOutputStream(bos));
+
+		serializer.serialize(record, outputViewWrapper);
+
+		return bos.toByteArray();
+	}
+
+	public static <T> T deserializeFromByteArray(TypeSerializer<T> serializer, byte[] buf) throws IOException {
+		if (buf == null) {
+			throw new NullPointerException("Byte array to deserialize from must not be null.");
+		}
+
+		InputViewDataInputStreamWrapper inputViewWrapper = new InputViewDataInputStreamWrapper(new DataInputStream(new ByteArrayInputStream(buf)));
+
+		T record = serializer.createInstance();
+		return serializer.deserialize(record, inputViewWrapper);
+	}
 	
 	public static Object deserializeObject(byte[] bytes, ClassLoader cl) throws IOException, ClassNotFoundException {
 		ObjectInputStream oois = null;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/da606304/flink-core/src/test/java/org/apache/flink/util/InstantiationUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/InstantiationUtilTest.java b/flink-core/src/test/java/org/apache/flink/util/InstantiationUtilTest.java
new file mode 100644
index 0000000..bf4fc8c
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/InstantiationUtilTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.api.common.typeutils.base.DoubleValueSerializer;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.types.Value;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class InstantiationUtilTest {
+
+	@Test
+	public void testInstantiationOfStringValue() {
+		StringValue stringValue = InstantiationUtil.instantiate(
+				StringValue.class, null);
+		assertNotNull(stringValue);
+	}
+
+	@Test
+	public void testInstantiationOfStringValueAndCastToValue() {
+		StringValue stringValue = InstantiationUtil.instantiate(
+				StringValue.class, Value.class);
+		assertNotNull(stringValue);
+	}
+
+	@Test
+	public void testHasNullaryConstructor() {
+		assertTrue(InstantiationUtil
+				.hasPublicNullaryConstructor(StringValue.class));
+	}
+
+	@Test
+	public void testClassIsProper() {
+		assertTrue(InstantiationUtil.isProperClass(StringValue.class));
+	}
+
+	@Test
+	public void testClassIsNotProper() {
+		assertFalse(InstantiationUtil.isProperClass(Value.class));
+	}
+
+	@Test(expected = RuntimeException.class)
+	public void testCheckForInstantiationOfPrivateClass() {
+		InstantiationUtil.checkForInstantiation(TestClass.class);
+	}
+
+	@Test
+	public void testSerializationToByteArray() throws IOException {
+		final DoubleValue toSerialize = new DoubleValue(Math.random());
+		final DoubleValueSerializer serializer = new DoubleValueSerializer();
+
+		byte[] serialized = InstantiationUtil.serializeToByteArray(serializer, toSerialize);
+
+		DoubleValue deserialized = InstantiationUtil.deserializeFromByteArray(serializer, serialized);
+
+		assertEquals("Serialized record is not equal after serialization.", toSerialize, deserialized);
+	}
+
+	private class TestClass {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/da606304/flink-core/src/test/java/org/apache/flink/util/InstantiationUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/InstantiationUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/InstantiationUtilsTest.java
deleted file mode 100644
index 1556bf2..0000000
--- a/flink-core/src/test/java/org/apache/flink/util/InstantiationUtilsTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.util;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.flink.types.StringValue;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.InstantiationUtil;
-import org.junit.Test;
-
-public class InstantiationUtilsTest {
-
-	@Test
-	public void testInstatiationOfStringValue() {
-		StringValue stringValue = InstantiationUtil.instantiate(
-				StringValue.class, null);
-		assertNotNull(stringValue);
-	}
-
-	@Test
-	public void testInstatiationOfStringValueAndCastToValue() {
-		StringValue stringValue = InstantiationUtil.instantiate(
-				StringValue.class, Value.class);
-		assertNotNull(stringValue);
-	}
-
-	@Test
-	public void testHasNullaryConstructor() {
-		assertTrue(InstantiationUtil
-				.hasPublicNullaryConstructor(StringValue.class));
-	}
-
-	@Test
-	public void testClassIsProper() {
-		assertTrue(InstantiationUtil.isProperClass(StringValue.class));
-	}
-
-	@Test
-	public void testClassIsNotProper() {
-		assertFalse(InstantiationUtil.isProperClass(Value.class));
-	}
-
-	@Test(expected = RuntimeException.class)
-	public void testCheckForInstantiationOfPrivateClass() {
-		InstantiationUtil.checkForInstantiation(TestClass.class);
-	}
-
-	private class TestClass {}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/da606304/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
index 77ba666..89adf96 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
@@ -44,10 +44,9 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
 	private TypeSerializer<T> serializer;
 
 	private transient Collection<T> dataSet; // input data as collection. transient, because it will be serialized in a custom way
-	
+
 	private transient Iterator<T> iterator;
 
-	
 	public CollectionInputFormat(Collection<T> dataSet, TypeSerializer<T> serializer) {
 		if (dataSet == null) {
 			throw new NullPointerException();
@@ -58,7 +57,6 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
 		this.dataSet = dataSet;
 	}
 
-	
 	@Override
 	public boolean reachedEnd() throws IOException {
 		return !this.iterator.hasNext();