You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2016/06/09 08:58:10 UTC

[1/2] flink git commit: [hotfix] Fix JSONDeserializationSchema for Kafka

Repository: flink
Updated Branches:
  refs/heads/master fce64e193 -> b5e2e3637


[hotfix] Fix JSONDeserializationSchema for Kafka


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/042ad7b9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/042ad7b9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/042ad7b9

Branch: refs/heads/master
Commit: 042ad7b9020dffef5c6861d41170a6e96579cd1f
Parents: fce64e1
Author: Robert Metzger <rm...@apache.org>
Authored: Thu Jun 9 10:56:09 2016 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 9 10:56:45 2016 +0200

----------------------------------------------------------------------
 .../util/serialization/JSONDeserializationSchema.java     | 10 ++--------
 .../connectors/kafka/JSONDeserializationSchemaTest.java   |  2 +-
 2 files changed, 3 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/042ad7b9/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
index 49e9da8..d170058 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
@@ -18,22 +18,20 @@ package org.apache.flink.streaming.util.serialization;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 
 import java.io.IOException;
 
-import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
 
 /**
  * DeserializationSchema that deserializes a JSON String into an ObjectNode.
  * <p>
  * Fields can be accessed by calling objectNode.get(&lt;name>).as(&lt;type>)
  */
-public class JSONDeserializationSchema implements KeyedDeserializationSchema<ObjectNode> {
+public class JSONDeserializationSchema extends AbstractDeserializationSchema<ObjectNode> {
 	private ObjectMapper mapper;
 
 	@Override
-	public ObjectNode deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
+	public ObjectNode deserialize(byte[] message) throws IOException {
 		if (mapper == null) {
 			mapper = new ObjectMapper();
 		}
@@ -45,8 +43,4 @@ public class JSONDeserializationSchema implements KeyedDeserializationSchema<Obj
 		return false;
 	}
 
-	@Override
-	public TypeInformation<ObjectNode> getProducedType() {
-		return getForClass(ObjectNode.class);
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/042ad7b9/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
index f8e3fd1..1882a7e 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
@@ -33,7 +33,7 @@ public class JSONDeserializationSchemaTest {
 		byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
 
 		JSONDeserializationSchema schema = new JSONDeserializationSchema();
-		ObjectNode deserializedValue = schema.deserialize(null, serializedValue, "", 0, 0);
+		ObjectNode deserializedValue = schema.deserialize(serializedValue);
 
 		Assert.assertEquals(4, deserializedValue.get("key").asInt());
 		Assert.assertEquals("world", deserializedValue.get("value").asText());


[2/2] flink git commit: [hotfix] Improve ParameterTool exception

Posted by rm...@apache.org.
[hotfix] Improve ParameterTool exception

This closes #2057


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b5e2e363
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b5e2e363
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b5e2e363

Branch: refs/heads/master
Commit: b5e2e3637a85bd690d2e2c0281d4c210cce1d95c
Parents: 042ad7b
Author: Robert Metzger <rm...@apache.org>
Authored: Thu Jun 9 10:56:27 2016 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 9 10:56:53 2016 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/flink/api/java/utils/ParameterTool.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b5e2e363/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
index 6be78e2..46c7387 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
@@ -152,7 +152,7 @@ public class ParameterTool extends ExecutionConfig.GlobalJobParameters implement
 	public static ParameterTool fromPropertiesFile(String path) throws IOException {
 		File propertiesFile = new File(path);
 		if(!propertiesFile.exists()) {
-			throw new FileNotFoundException("Properties file "+path+" does not exist");
+			throw new FileNotFoundException("Properties file " + propertiesFile.getAbsolutePath() + " does not exist");
 		}
 		Properties props = new Properties();
 		FileInputStream fis = new FileInputStream(propertiesFile);