You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2016/06/09 08:58:10 UTC
[1/2] flink git commit: [hotfix] Fix JSONDeserializationSchema for
Kafka
Repository: flink
Updated Branches:
refs/heads/master fce64e193 -> b5e2e3637
[hotfix] Fix JSONDeserializationSchema for Kafka
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/042ad7b9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/042ad7b9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/042ad7b9
Branch: refs/heads/master
Commit: 042ad7b9020dffef5c6861d41170a6e96579cd1f
Parents: fce64e1
Author: Robert Metzger <rm...@apache.org>
Authored: Thu Jun 9 10:56:09 2016 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 9 10:56:45 2016 +0200
----------------------------------------------------------------------
.../util/serialization/JSONDeserializationSchema.java | 10 ++--------
.../connectors/kafka/JSONDeserializationSchemaTest.java | 2 +-
2 files changed, 3 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/042ad7b9/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
index 49e9da8..d170058 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
@@ -18,22 +18,20 @@ package org.apache.flink.streaming.util.serialization;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.io.IOException;
-import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
/**
* DeserializationSchema that deserializes a JSON String into an ObjectNode.
* <p>
* Fields can be accessed by calling objectNode.get(<name>).as(<type>)
*/
-public class JSONDeserializationSchema implements KeyedDeserializationSchema<ObjectNode> {
+public class JSONDeserializationSchema extends AbstractDeserializationSchema<ObjectNode> {
private ObjectMapper mapper;
@Override
- public ObjectNode deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
+ public ObjectNode deserialize(byte[] message) throws IOException {
if (mapper == null) {
mapper = new ObjectMapper();
}
@@ -45,8 +43,4 @@ public class JSONDeserializationSchema implements KeyedDeserializationSchema<Obj
return false;
}
- @Override
- public TypeInformation<ObjectNode> getProducedType() {
- return getForClass(ObjectNode.class);
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/042ad7b9/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
index f8e3fd1..1882a7e 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
@@ -33,7 +33,7 @@ public class JSONDeserializationSchemaTest {
byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
JSONDeserializationSchema schema = new JSONDeserializationSchema();
- ObjectNode deserializedValue = schema.deserialize(null, serializedValue, "", 0, 0);
+ ObjectNode deserializedValue = schema.deserialize(serializedValue);
Assert.assertEquals(4, deserializedValue.get("key").asInt());
Assert.assertEquals("world", deserializedValue.get("value").asText());
[2/2] flink git commit: [hotfix] Improve ParameterTool exception
Posted by rm...@apache.org.
[hotfix] Improve ParameterTool exception
This closes #2057
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b5e2e363
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b5e2e363
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b5e2e363
Branch: refs/heads/master
Commit: b5e2e3637a85bd690d2e2c0281d4c210cce1d95c
Parents: 042ad7b
Author: Robert Metzger <rm...@apache.org>
Authored: Thu Jun 9 10:56:27 2016 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 9 10:56:53 2016 +0200
----------------------------------------------------------------------
.../main/java/org/apache/flink/api/java/utils/ParameterTool.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b5e2e363/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
index 6be78e2..46c7387 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
@@ -152,7 +152,7 @@ public class ParameterTool extends ExecutionConfig.GlobalJobParameters implement
public static ParameterTool fromPropertiesFile(String path) throws IOException {
File propertiesFile = new File(path);
if(!propertiesFile.exists()) {
- throw new FileNotFoundException("Properties file "+path+" does not exist");
+ throw new FileNotFoundException("Properties file " + propertiesFile.getAbsolutePath() + " does not exist");
}
Properties props = new Properties();
FileInputStream fis = new FileInputStream(propertiesFile);