You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/08/31 15:30:08 UTC

flink git commit: [core][runtime] move SerializedValueTest from runtime to core

Repository: flink
Updated Branches:
  refs/heads/master ac9a91172 -> 6b0c0e4c0


[core][runtime] move SerializedValueTest from runtime to core

- move createCopySerializable to core's CommonTestUtils
- rename CommonTestUtils createCopy to createCopyWritable
- adapt the tests to use core's CommonTestUtils where applicable

This closes #1081.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b0c0e4c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b0c0e4c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b0c0e4c

Branch: refs/heads/master
Commit: 6b0c0e4c00f40bd3159ff21fc74463ab983bfe6e
Parents: ac9a911
Author: Maximilian Michels <mx...@apache.org>
Authored: Mon Aug 31 14:04:58 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Mon Aug 31 15:29:28 2015 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigurationTest.java  |  2 +-
 .../flink/core/testutils/CommonTestUtils.java   | 32 ++++++++-
 .../org/apache/flink/util/AbstractIDTest.java   | 22 +++---
 .../apache/flink/util/SerializedValueTest.java  | 72 +++++++++++++++++++
 .../apache/flink/runtime/blob/BlobKeyTest.java  |  2 +-
 .../SerializedJobExecutionResultTest.java       |  2 +-
 .../TaskDeploymentDescriptorTest.java           |  2 +-
 .../flink/runtime/event/task/TaskEventTest.java |  2 +-
 .../instance/InstanceConnectionInfoTest.java    |  2 +-
 .../flink/runtime/jobgraph/JobGraphTest.java    |  2 +-
 .../messages/CheckpointMessagesTest.java        |  2 +-
 .../taskmanager/TaskExecutionStateTest.java     |  2 +-
 .../runtime/testutils/CommonTestUtils.java      | 76 --------------------
 .../runtime/util/SerializedThrowableTest.java   |  2 +-
 .../flink/runtime/util/SerializedValueTest.java | 73 -------------------
 .../flink/streaming/api/SourceFunctionTest.java |  3 +-
 .../api/functions/FromElementsFunctionTest.java |  2 +-
 .../TypeInformationSerializationSchemaTest.java |  2 +-
 18 files changed, 127 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6b0c0e4c/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
index e3b75b6..5f0947d 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
@@ -54,7 +54,7 @@ public class ConfigurationTest extends TestLogger {
 			orig.setBytes("bytes sequence", new byte[] { 1, 2, 3, 4, 5 } );
 			orig.setClass("myclass", this.getClass());
 	
-			final Configuration copy = (Configuration) CommonTestUtils.createCopy(orig);
+			final Configuration copy = CommonTestUtils.createCopyWritable(orig);
 			assertEquals("myvalue", copy.getString("mykey", "null"));
 			assertEquals(100, copy.getInteger("mynumber", 0));
 			assertEquals(478236947162389746L, copy.getLong("longvalue", 0L));

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0c0e4c/flink-core/src/test/java/org/apache/flink/core/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/testutils/CommonTestUtils.java b/flink-core/src/test/java/org/apache/flink/core/testutils/CommonTestUtils.java
index c876940..5b7afaa 100644
--- a/flink-core/src/test/java/org/apache/flink/core/testutils/CommonTestUtils.java
+++ b/flink-core/src/test/java/org/apache/flink/core/testutils/CommonTestUtils.java
@@ -27,6 +27,8 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -98,7 +100,7 @@ public class CommonTestUtils {
 	 *         thrown if an error occurs while creating the copy of the object
 	 */
 	@SuppressWarnings("unchecked")
-	public static <T extends IOReadableWritable> T createCopy(final T original) throws IOException {
+	public static <T extends IOReadableWritable> T createCopyWritable(final T original) throws IOException {
 
 		final ByteArrayOutputStream baos = new ByteArrayOutputStream();
 		final DataOutputStream dos = new DataOutputStream(baos);
@@ -142,4 +144,32 @@ public class CommonTestUtils {
 
 		return copy;
 	}
+
+	public static <T extends java.io.Serializable> T createCopySerializable(T original) throws IOException {
+		if (original == null) {
+			throw new IllegalArgumentException();
+		}
+
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		ObjectOutputStream oos = new ObjectOutputStream(baos);
+		oos.writeObject(original);
+		oos.close();
+		baos.close();
+
+		ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+		ObjectInputStream ois = new ObjectInputStream(bais);
+
+		T copy;
+		try {
+			copy = (T) ois.readObject();
+		}
+		catch (ClassNotFoundException e) {
+			throw new IOException(e);
+		}
+
+		ois.close();
+		bais.close();
+
+		return copy;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0c0e4c/flink-core/src/test/java/org/apache/flink/util/AbstractIDTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/AbstractIDTest.java b/flink-core/src/test/java/org/apache/flink/util/AbstractIDTest.java
index 8134a68..ba7d60c 100644
--- a/flink-core/src/test/java/org/apache/flink/util/AbstractIDTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/AbstractIDTest.java
@@ -36,7 +36,7 @@ public class AbstractIDTest extends TestLogger {
 	public void testSerialization() {
 		final AbstractID origID = new AbstractID();
 		try {
-			final AbstractID copyID = CommonTestUtils.createCopy(origID);
+			final AbstractID copyID = CommonTestUtils.createCopyWritable(origID);
 
 			assertEquals(origID.hashCode(), copyID.hashCode());
 			assertEquals(origID, copyID);
@@ -83,16 +83,16 @@ public class AbstractIDTest extends TestLogger {
 			AbstractID id10 = new AbstractID(Long.MIN_VALUE, Long.MAX_VALUE);
 			
 			// test self equality
-			assertEquals(0, id1.compareTo(CommonTestUtils.createCopy(id1)));
-			assertEquals(0, id2.compareTo(CommonTestUtils.createCopy(id2)));
-			assertEquals(0, id3.compareTo(CommonTestUtils.createCopy(id3)));
-			assertEquals(0, id4.compareTo(CommonTestUtils.createCopy(id4)));
-			assertEquals(0, id5.compareTo(CommonTestUtils.createCopy(id5)));
-			assertEquals(0, id6.compareTo(CommonTestUtils.createCopy(id6)));
-			assertEquals(0, id7.compareTo(CommonTestUtils.createCopy(id7)));
-			assertEquals(0, id8.compareTo(CommonTestUtils.createCopy(id8)));
-			assertEquals(0, id9.compareTo(CommonTestUtils.createCopy(id9)));
-			assertEquals(0, id10.compareTo(CommonTestUtils.createCopy(id10)));
+			assertEquals(0, id1.compareTo(CommonTestUtils.createCopyWritable(id1)));
+			assertEquals(0, id2.compareTo(CommonTestUtils.createCopyWritable(id2)));
+			assertEquals(0, id3.compareTo(CommonTestUtils.createCopyWritable(id3)));
+			assertEquals(0, id4.compareTo(CommonTestUtils.createCopyWritable(id4)));
+			assertEquals(0, id5.compareTo(CommonTestUtils.createCopyWritable(id5)));
+			assertEquals(0, id6.compareTo(CommonTestUtils.createCopyWritable(id6)));
+			assertEquals(0, id7.compareTo(CommonTestUtils.createCopyWritable(id7)));
+			assertEquals(0, id8.compareTo(CommonTestUtils.createCopyWritable(id8)));
+			assertEquals(0, id9.compareTo(CommonTestUtils.createCopyWritable(id9)));
+			assertEquals(0, id10.compareTo(CommonTestUtils.createCopyWritable(id10)));
 			
 			// test order
 			assertCompare(id1, id2, -1);

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0c0e4c/flink-core/src/test/java/org/apache/flink/util/SerializedValueTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/SerializedValueTest.java b/flink-core/src/test/java/org/apache/flink/util/SerializedValueTest.java
new file mode 100644
index 0000000..fda368a
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/SerializedValueTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+public class SerializedValueTest {
+
+	@Test
+	public void testSimpleValue() {
+		try {
+			final String value = "teststring";
+
+			SerializedValue<String> v = new SerializedValue<String>(value);
+			SerializedValue<String> copy = CommonTestUtils.createCopySerializable(v);
+
+			assertEquals(value, v.deserializeValue(getClass().getClassLoader()));
+			assertEquals(value, copy.deserializeValue(getClass().getClassLoader()));
+
+			assertEquals(v, copy);
+			assertEquals(v.hashCode(), copy.hashCode());
+
+			assertNotNull(v.toString());
+			assertNotNull(copy.toString());
+
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testNullValue() {
+		try {
+			SerializedValue<Object> v = new SerializedValue<Object>(null);
+			SerializedValue<Object> copy = CommonTestUtils.createCopySerializable(v);
+
+			assertNull(copy.deserializeValue(getClass().getClassLoader()));
+
+			assertEquals(v, copy);
+			assertEquals(v.hashCode(), copy.hashCode());
+			assertEquals(v.toString(), copy.toString());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0c0e4c/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
index 03d370e..6f873de 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
@@ -27,7 +27,7 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 
-import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.util.StringUtils;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0c0e4c/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java
index a22ed13..b3bac58 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.client;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0c0e4c/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
index 50b154e..3a36fe8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.operators.RegularPactTask;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.junit.Test;
 
 public class TaskDeploymentDescriptorTest {

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0c0e4c/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java
index b508923..d659b45 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java
@@ -26,7 +26,7 @@ import java.io.IOException;
 import java.util.Iterator;
 
 import org.apache.flink.runtime.event.AbstractEvent;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.junit.Test;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0c0e4c/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
index 2769183..bc8cd63 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
@@ -25,7 +25,7 @@ import static org.junit.Assert.fail;
 
 import java.net.InetAddress;
 
-import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.junit.Assert;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0c0e4c/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
index 9f88bd5..ca047e8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
@@ -23,7 +23,7 @@ import static org.junit.Assert.*;
 import java.util.List;
 
 import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.junit.Test;
 
 public class JobGraphTest {

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0c0e4c/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
index 1e5b12a..68575e5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
 import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0c0e4c/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java
index b3f2456..f6c119e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java
@@ -27,7 +27,7 @@ import java.io.PrintWriter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.core.testutils.CommonTestUtils;
 
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0c0e4c/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
index ca05416..61b1f7a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
@@ -45,82 +45,6 @@ import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
 public class CommonTestUtils {
 
 	/**
-	 * Creates a copy of the given {@link IOReadableWritable} object by an in-memory serialization and subsequent
-	 * deserialization.
-	 * 
-	 * @param original
-	 *        the original object to be copied
-	 * @return the copy of original object created by the original object's serialization/deserialization methods
-	 * @throws IOException
-	 *         thrown if an error occurs while creating the copy of the object
-	 */
-	@SuppressWarnings("unchecked")
-	public static <T extends IOReadableWritable> T createCopyWritable(final T original) throws IOException {
-
-		final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		final DataOutputStream dos = new DataOutputStream(baos);
-
-		original.write(new OutputViewDataOutputStreamWrapper(dos));
-
-		final String className = original.getClass().getName();
-
-		Class<T> clazz = null;
-
-		try {
-			clazz = (Class<T>) Class.forName(className);
-		} catch (ClassNotFoundException e) {
-			fail(e.getMessage());
-		}
-
-		T copy = null;
-		try {
-			copy = clazz.newInstance();
-		} catch (Throwable t) {
-			t.printStackTrace();
-			fail(t.getMessage());
-		}
-
-		final ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
-		final DataInputStream dis = new DataInputStream(bais);
-
-		copy.read(new InputViewDataInputStreamWrapper(dis));
-		if (dis.available() > 0) {
-			throw new IOException("The coped result was not fully consumed.");
-		}
-
-		return copy;
-	}
-	
-	@SuppressWarnings("unchecked")
-	public static <T extends java.io.Serializable> T createCopySerializable(T original) throws IOException {
-		if (original == null) {
-			throw new IllegalArgumentException();
-		}
-		
-		ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		ObjectOutputStream oos = new ObjectOutputStream(baos);
-		oos.writeObject(original);
-		oos.close();
-		baos.close();
-		
-		ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
-		ObjectInputStream ois = new ObjectInputStream(bais);
-		
-		T copy;
-		try {
-			copy = (T) ois.readObject();
-		}
-		catch (ClassNotFoundException e) {
-			throw new IOException(e);
-		}
-		
-		ois.close();
-		bais.close();
-		
-		return copy;
-	}
-
-	/**
 	 * Sleeps for a given set of milliseconds, uninterruptibly. If interrupt is called,
 	 * the sleep will continue nonetheless.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0c0e4c/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
index 3dca362..50efd52 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.util;
 
 import org.apache.flink.core.memory.MemoryUtils;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.InstantiationUtil;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0c0e4c/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedValueTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedValueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedValueTest.java
deleted file mode 100644
index 0d19613..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedValueTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.util.SerializedValue;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
-public class SerializedValueTest {
-
-	@Test
-	public void testSimpleValue() {
-		try {
-			final String value = "teststring";
-
-			SerializedValue<String> v = new SerializedValue<String>(value);
-			SerializedValue<String> copy = CommonTestUtils.createCopySerializable(v);
-
-			assertEquals(value, v.deserializeValue(getClass().getClassLoader()));
-			assertEquals(value, copy.deserializeValue(getClass().getClassLoader()));
-
-			assertEquals(v, copy);
-			assertEquals(v.hashCode(), copy.hashCode());
-
-			assertNotNull(v.toString());
-			assertNotNull(copy.toString());
-
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testNullValue() {
-		try {
-			SerializedValue<Object> v = new SerializedValue<Object>(null);
-			SerializedValue<Object> copy = CommonTestUtils.createCopySerializable(v);
-
-			assertNull(copy.deserializeValue(getClass().getClassLoader()));
-
-			assertEquals(v, copy);
-			assertEquals(v.hashCode(), copy.hashCode());
-			assertEquals(v.toString(), copy.toString());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0c0e4c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
index f0fe63d..b53649a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
@@ -18,13 +18,12 @@
 package org.apache.flink.streaming.api;
 
 import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
 
 import java.util.Arrays;
 import java.util.List;
 
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
 import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
 import org.apache.flink.streaming.util.SourceFunctionUtil;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0c0e4c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
index 9c3653b..41bd381 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.types.Value;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0c0e4c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
index 8c847d3..1c0f850 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.util;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 
 import org.junit.Test;