You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/13 20:01:40 UTC

[1/2] flink git commit: [FLINK-2011] [runtime] Improve error message when user-defined serialization logic is erroneous

Repository: flink
Updated Branches:
  refs/heads/master 48e21a1ae -> 113b20b7f


[FLINK-2011] [runtime] Improve error message when user-defined serialization logic is erroneous


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/113b20b7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/113b20b7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/113b20b7

Branch: refs/heads/master
Commit: 113b20b7f8717b12c5f0dfa691da582d426fbae0
Parents: 9da2f1f
Author: Stephan Ewen <se...@apache.org>
Authored: Wed May 13 15:27:00 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed May 13 18:26:26 2015 +0200

----------------------------------------------------------------------
 .../api/reader/AbstractRecordReader.java        | 29 ++++++++++++++------
 ...llingAdaptiveSpanningRecordDeserializer.java |  1 -
 2 files changed, 20 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/113b20b7/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
index 4ee7fad..bf43c72 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
@@ -79,16 +79,27 @@ abstract class AbstractRecordReader<T extends IOReadableWritable> extends Abstra
 				currentRecordDeserializer = recordDeserializers[bufferOrEvent.getChannelIndex()];
 				currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
 			}
-			else if (handleEvent(bufferOrEvent.getEvent())) {
-				if (inputGate.isFinished()) {
-					isFinished = true;
-
-					return false;
+			else {
+				// sanity check for leftover data in deserializers. events should only come between
+				// records, not in the middle of a fragment
+				if (recordDeserializers[bufferOrEvent.getChannelIndex()].hasUnfinishedData()) {
+					throw new IllegalStateException(
+							"Received an event in channel " + bufferOrEvent.getChannelIndex() + " while still having "
+							+ "data from a record. This indicates broken serialization logic. "
+							+ "If you are using custom serialization code (Writable or Value types), check their "
+							+ "serialization routines. In the case of Kryo, check the respective Kryo serializer.");
+				}
+				
+				if (handleEvent(bufferOrEvent.getEvent())) {
+					if (inputGate.isFinished()) {
+						isFinished = true;
+						return false;
+					}
+					else if (hasReachedEndOfSuperstep()) {
+						return false;
+					}
+					// else: More data is coming...
 				}
-				else if (hasReachedEndOfSuperstep()) {
-
-					return false;
-				} // else: More data is coming...
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/113b20b7/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index eae3e65..453d448 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -24,7 +24,6 @@ import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.util.DataInputDeserializer;
 import org.apache.flink.util.StringUtils;


[2/2] flink git commit: [FLINK-2006] [runtime] Fix testing TaskManager task status request to stabilize tests.

Posted by se...@apache.org.
[FLINK-2006] [runtime] Fix testing TaskManager task status request to stabilize tests.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9da2f1f7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9da2f1f7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9da2f1f7

Branch: refs/heads/master
Commit: 9da2f1f725473f1242067acb546607888e3a5015
Parents: 48e21a1
Author: Stephan Ewen <se...@apache.org>
Authored: Wed May 13 14:23:47 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed May 13 18:26:26 2015 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/testingUtils/TestingTaskManager.scala   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9da2f1f7/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
index 5318254..220e6ca 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -71,8 +71,8 @@ class TestingTaskManager(config: TaskManagerConfiguration,
   def receiveTestMessages: Receive = {
     case NotifyWhenTaskIsRunning(executionID) => {
       Option(runningTasks.get(executionID)) match {
-        case Some(_) => sender ! true
-        case None =>
+        case Some(task) if task.getExecutionState == ExecutionState.RUNNING => sender ! true
+        case _ =>
           val listeners = waitForRunning.getOrElse(executionID, Set())
           waitForRunning += (executionID -> (listeners + sender))
       }