You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/21 14:13:33 UTC

[4/6] flink git commit: [FLINK-2880] [streaming] Allow DeserializationSchema to forward exceptions.

[FLINK-2880] [streaming] Allow DeserializationSchema to forward exceptions.

This closes #1275


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a959bcb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a959bcb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a959bcb

Branch: refs/heads/master
Commit: 7a959bcb405e0e4e6655dc87824c4ab27da317c2
Parents: dd3264e
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Oct 20 19:17:59 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Oct 21 11:52:39 2015 +0200

----------------------------------------------------------------------
 .../streaming/connectors/kafka/KafkaConsumerTestBase.java     | 7 +++++--
 .../streaming/util/serialization/DeserializationSchema.java   | 3 ++-
 2 files changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7a959bcb/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index e9a5728..ffb6818 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -76,6 +76,7 @@ import org.junit.Assert;
 import org.junit.Rule;
 import scala.collection.Seq;
 
+import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -1012,7 +1013,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 	private static void printTopic(String topicName, ConsumerConfig config,
 								DeserializationSchema<?> deserializationSchema,
-								int stopAfter) {
+								int stopAfter) throws IOException {
 
 		List<MessageAndMetadata<byte[], byte[]>> contents = readTopicToList(topicName, config, stopAfter);
 		LOG.info("Printing contents of topic {} in consumer grouo {}", topicName, config.groupId());
@@ -1023,7 +1024,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		}
 	}
 
-	private static void printTopic(String topicName, int elements,DeserializationSchema<?> deserializer) {
+	private static void printTopic(String topicName, int elements,DeserializationSchema<?> deserializer) 
+			throws IOException
+	{
 		// write the sequence to log for debugging purposes
 		Properties stdProps = standardCC.props().props();
 		Properties newProps = new Properties(stdProps);

http://git-wip-us.apache.org/repos/asf/flink/blob/7a959bcb/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
index f0e4477..cd25e83 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.util.serialization;
 
+import java.io.IOException;
 import java.io.Serializable;
 
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
@@ -36,7 +37,7 @@ public interface DeserializationSchema<T> extends Serializable, ResultTypeQuerya
 	 * @param message The message, as a byte array.
 	 * @return The deserialized message as an object.
 	 */
-	T deserialize(byte[] message);
+	T deserialize(byte[] message) throws IOException;
 
 	/**
 	 * Method to decide whether the element signals the end of the stream. If