You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/23 14:25:19 UTC

[1/2] flink git commit: [FLINK-6492] Fix unclosed DataOutputViewStream usage

Repository: flink
Updated Branches:
  refs/heads/master f369f8640 -> 557540a51


[FLINK-6492] Fix unclosed DataOutputViewStream usage

This closes #3898.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ded464b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ded464b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ded464b8

Branch: refs/heads/master
Commit: ded464b8bd60d8f19221c0f1589346684c11c78d
Parents: f369f86
Author: huafengw <fv...@gmail.com>
Authored: Mon May 15 15:56:19 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue May 23 20:46:43 2017 +0800

----------------------------------------------------------------------
 .../base/GenericArraySerializerConfigSnapshot.java          | 8 +++++---
 .../runtime/KryoRegistrationSerializerConfigSnapshot.java   | 9 ++++++---
 .../typeutils/runtime/TupleSerializerConfigSnapshot.java    | 8 +++++---
 .../java/org/apache/flink/runtime/state/JavaSerializer.java | 8 +++++---
 4 files changed, 21 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ded464b8/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
index 79dcf89..70e5210 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
@@ -58,15 +58,17 @@ public final class GenericArraySerializerConfigSnapshot<C> extends CompositeType
 	public void write(DataOutputView out) throws IOException {
 		super.write(out);
 
-		InstantiationUtil.serializeObject(new DataOutputViewStream(out), componentClass);
+		try (final DataOutputViewStream outViewWrapper = new DataOutputViewStream(out)) {
+			InstantiationUtil.serializeObject(outViewWrapper, componentClass);
+		}
 	}
 
 	@Override
 	public void read(DataInputView in) throws IOException {
 		super.read(in);
 
-		try {
-			componentClass = InstantiationUtil.deserializeObject(new DataInputViewStream(in), getUserCodeClassLoader());
+		try (final DataInputViewStream inViewWrapper = new DataInputViewStream(in)) {
+			componentClass = InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader());
 		} catch (ClassNotFoundException e) {
 			throw new IOException("Could not find requested element class in classpath.", e);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/ded464b8/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java
index 3a42d69..14287ca 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java
@@ -133,7 +133,9 @@ public abstract class KryoRegistrationSerializerConfigSnapshot<T> extends Generi
 					out.writeUTF(kryoRegistration.getSerializerClass().getName());
 					break;
 				case INSTANCE:
-					InstantiationUtil.serializeObject(new DataOutputViewStream(out), kryoRegistration.getSerializableSerializerInstance());
+					try (final DataOutputViewStream outViewWrapper = new DataOutputViewStream(out)) {
+						InstantiationUtil.serializeObject(outViewWrapper, kryoRegistration.getSerializableSerializerInstance());
+					}
 					break;
 				default:
 					// this should not happen; adding as a guard for the future
@@ -184,8 +186,9 @@ public abstract class KryoRegistrationSerializerConfigSnapshot<T> extends Generi
 
 				case INSTANCE:
 					ExecutionConfig.SerializableSerializer<? extends Serializer<RC>> serializerInstance;
-					try {
-						serializerInstance = InstantiationUtil.deserializeObject(new DataInputViewStream(in), userCodeClassLoader);
+
+					try (final DataInputViewStream inViewWrapper = new DataInputViewStream(in)) {
+						serializerInstance = InstantiationUtil.deserializeObject(inViewWrapper, userCodeClassLoader);
 					} catch (ClassNotFoundException e) {
 						LOG.warn("Cannot find registered Kryo serializer class for class " + registeredClassname +
 								" in classpath; using a dummy Kryo serializer that should be replaced as soon as" +

http://git-wip-us.apache.org/repos/asf/flink/blob/ded464b8/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
index 1e7701c..705099e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
@@ -51,15 +51,17 @@ public final class TupleSerializerConfigSnapshot<T> extends CompositeTypeSeriali
 	public void write(DataOutputView out) throws IOException {
 		super.write(out);
 
-		InstantiationUtil.serializeObject(new DataOutputViewStream(out), tupleClass);
+		try (final DataOutputViewStream outViewWrapper = new DataOutputViewStream(out)) {
+			InstantiationUtil.serializeObject(outViewWrapper, tupleClass);
+		}
 	}
 
 	@Override
 	public void read(DataInputView in) throws IOException {
 		super.read(in);
 
-		try {
-			tupleClass = InstantiationUtil.deserializeObject(new DataInputViewStream(in), getUserCodeClassLoader());
+		try (final DataInputViewStream inViewWrapper = new DataInputViewStream(in)) {
+			tupleClass = InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader());
 		} catch (ClassNotFoundException e) {
 			throw new IOException("Could not find requested tuple class in classpath.", e);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/ded464b8/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
index 5252b3d..7d9e888 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
@@ -66,14 +66,16 @@ final class JavaSerializer<T extends Serializable> extends TypeSerializerSinglet
 
 	@Override
 	public void serialize(T record, DataOutputView target) throws IOException {
-		InstantiationUtil.serializeObject(new DataOutputViewStream(target), record);
+		try (final DataOutputViewStream outViewWrapper = new DataOutputViewStream(target)) {
+			InstantiationUtil.serializeObject(outViewWrapper, record);
+		}
 	}
 
 	@Override
 	public T deserialize(DataInputView source) throws IOException {
-		try {
+		try (final DataInputViewStream inViewWrapper = new DataInputViewStream(source)) {
 			return InstantiationUtil.deserializeObject(
-					new DataInputViewStream(source),
+					inViewWrapper,
 					Thread.currentThread().getContextClassLoader());
 		} catch (ClassNotFoundException e) {
 			throw new IOException("Could not deserialize object.", e);


[2/2] flink git commit: [FLINK-6660] [docs] Expand the connectors overview page

Posted by tz...@apache.org.
[FLINK-6660] [docs] Expand the connectors overview page

This closes #3964.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/557540a5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/557540a5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/557540a5

Branch: refs/heads/master
Commit: 557540a51cf8a1d6fef1e2e80ad0db4c148b3302
Parents: ded464b
Author: David Anderson <da...@alpinegizmo.com>
Authored: Mon May 22 17:39:34 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue May 23 20:48:01 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/filesystem_sink.md |  2 +-
 docs/dev/connectors/index.md           | 56 +++++++++++++++++++++--------
 docs/dev/connectors/twitter.md         |  2 +-
 3 files changed, 44 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/557540a5/docs/dev/connectors/filesystem_sink.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/filesystem_sink.md b/docs/dev/connectors/filesystem_sink.md
index d12752f..2d48876 100644
--- a/docs/dev/connectors/filesystem_sink.md
+++ b/docs/dev/connectors/filesystem_sink.md
@@ -24,7 +24,7 @@ under the License.
 -->
 
 This connector provides a Sink that writes partitioned files to any filesystem supported by
-Hadoop FileSystem. To use this connector, add the
+[Hadoop FileSystem](http://hadoop.apache.org). To use this connector, add the
 following dependency to your project:
 
 {% highlight xml %}

http://git-wip-us.apache.org/repos/asf/flink/blob/557540a5/docs/dev/connectors/index.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/index.md b/docs/dev/connectors/index.md
index f5c3eec..ff76aee 100644
--- a/docs/dev/connectors/index.md
+++ b/docs/dev/connectors/index.md
@@ -25,22 +25,50 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Connectors provide code for interfacing with various third-party systems.
+* toc
+{:toc}
 
-Currently these systems are supported: (Please select the respective documentation page from the navigation on the left.)
+## Predefined Sources and Sinks
 
- * [Apache Kafka](https://kafka.apache.org/) (sink/source)
- * [Elasticsearch](https://elastic.co/) (sink)
- * [Hadoop FileSystem](http://hadoop.apache.org) (sink)
- * [RabbitMQ](http://www.rabbitmq.com/) (sink/source)
- * [Amazon Kinesis Streams](http://aws.amazon.com/kinesis/streams/) (sink/source)
- * [Twitter Streaming API](https://dev.twitter.com/docs/streaming-apis) (source)
- * [Apache NiFi](https://nifi.apache.org) (sink/source)
- * [Apache Cassandra](https://cassandra.apache.org/) (sink)
+A few basic data sources and sinks are built into Flink and are always available.
+The [predefined data sources]({{ site.baseurll }}/dev/datastream_api.html#data-sources) include reading from files, directories, and sockets, and
+ingesting data from collections and iterators.
+The [predefined data sinks]({{ site.baseurl }}/dev/datastream_api.html#data-sinks) support writing to files, to stdout and stderr, and to sockets.
 
+## Bundled Connectors
 
+Connectors provide code for interfacing with various third-party systems. Currently these systems are supported:
 
-To run an application using one of these connectors, additional third party
-components are usually required to be installed and launched, e.g. the servers
-for the message queues. Further instructions for these can be found in the
-corresponding subsections.
+ * [Apache Kafka](kafka.html) (sink/source)
+ * [Apache Cassandra](cassandra.html) (sink)
+ * [Amazon Kinesis Streams](kinesis.html) (sink/source)
+ * [Elasticsearch](elasticsearch.html) (sink)
+ * [Hadoop FileSystem](filesystem_sink.html) (sink)
+ * [RabbitMQ](rabbitmq.html) (sink/source)
+ * [Apache NiFi](nifi.html) (sink/source)
+ * [Twitter Streaming API](twitter.html) (source)
+
+Keep in mind that to use one of these connectors in an application, additional third party
+components are usually required, e.g. servers for the data stores or message queues.
+Note also that while the streaming connectors listed in this section are part of the
+Flink project and are included in source releases, they are not included in the binary distributions. 
+Further instructions can be found in the corresponding subsections.
+
+## Other Ways to Connect to Flink
+
+### Data Enrichment via Async I/O
+
+Using a connector isn't the only way to get data in and out of Flink.
+One common pattern is to query an external database or web service in a `Map` or `FlatMap`
+in order to enrich the primary datastream.
+Flink offers an API for [Asynchronous I/O]({{ site.baseurl }}/dev/stream/asyncio.html)
+to make it easier to do this kind of enrichment efficiently and robustly.
+
+### Queryable State
+
+When a Flink application pushes a lot of data to an external data store, this
+can become an I/O bottleneck.
+If the data involved has many fewer reads than writes, a better approach can be
+for an external application to pull from Flink the data it needs.
+The [Queryable State]({{ site.baseurl }}/dev/stream/queryable_state.html) interface
+enables this by allowing the state being managed by Flink to be queried on demand.

http://git-wip-us.apache.org/repos/asf/flink/blob/557540a5/docs/dev/connectors/twitter.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/twitter.md b/docs/dev/connectors/twitter.md
index 5fb7d68..0cded6a 100644
--- a/docs/dev/connectors/twitter.md
+++ b/docs/dev/connectors/twitter.md
@@ -23,7 +23,7 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-The Twitter Streaming API provides access to the stream of tweets made available by Twitter.
+The [Twitter Streaming API](https://dev.twitter.com/docs/streaming-apis) provides access to the stream of tweets made available by Twitter.
 Flink Streaming comes with a built-in `TwitterSource` class for establishing a connection to this stream.
 To use this connector, add the following dependency to your project: