You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/24 11:28:29 UTC

[1/4] flink git commit: [FLINK-5885] [docs] Fix Cassandra Scala snippet

Repository: flink
Updated Branches:
  refs/heads/master 15ae922ad -> 6f3723e83


[FLINK-5885] [docs] Fix Cassandra Scala snippet

This closes #3400


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6f3723e8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6f3723e8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6f3723e8

Branch: refs/heads/master
Commit: 6f3723e83ff7f2b862a3f57cebbc78060937a2b8
Parents: 813c258
Author: Andrea Sella <an...@radicalbit.io>
Authored: Thu Feb 23 10:53:56 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 24 12:15:18 2017 +0100

----------------------------------------------------------------------
 docs/dev/connectors/cassandra.md | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6f3723e8/docs/dev/connectors/cassandra.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/cassandra.md b/docs/dev/connectors/cassandra.md
index 7f76b72..c897779 100644
--- a/docs/dev/connectors/cassandra.md
+++ b/docs/dev/connectors/cassandra.md
@@ -100,12 +100,11 @@ CassandraSink.addSink(input)
 CassandraSink.addSink(input)
   .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
   .setClusterBuilder(new ClusterBuilder() {
-    @Override
-    public Cluster buildCluster(Cluster.Builder builder) {
-      return builder.addContactPoint("127.0.0.1").build();
+    override def buildCluster(builder: Cluster.Builder): Cluster = {
+      builder.addContactPoint("127.0.0.1").build()
     }
   })
-  .build();
+  .build()
 {% endhighlight %}
 </div>
 </div>


[2/4] flink git commit: [hotfix] [tests] Remove sysout logging in KvStateLocationTest

Posted by se...@apache.org.
[hotfix] [tests] Remove sysout logging in KvStateLocationTest

This helps keeping test log output free from clutter.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/31f3d65c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/31f3d65c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/31f3d65c

Branch: refs/heads/master
Commit: 31f3d65c59034a9c5f40a7de34c0219792507327
Parents: f6e6e7e
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Feb 23 16:51:14 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 24 12:15:18 2017 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/runtime/query/KvStateLocationTest.java   | 2 --
 1 file changed, 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/31f3d65c/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java
index ed51f62..cd5c6d5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java
@@ -56,8 +56,6 @@ public class KvStateLocationTest {
 			start = end + 1;
 		}
 
-		System.out.println(keyGroupRanges);
-
 		String registrationName = "asdasdasdasd";
 
 		KvStateLocation location = new KvStateLocation(jobId, jobVertexId, numKeyGroups, registrationName);


[3/4] flink git commit: [FLINK-5854] [core] Add base Flink Exception classes

Posted by se...@apache.org.
[FLINK-5854] [core] Add base Flink Exception classes

This closes #3368


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/813c2585
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/813c2585
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/813c2585

Branch: refs/heads/master
Commit: 813c2585a49c673b71678463d719b6a85b778994
Parents: 31f3d65
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Feb 17 16:24:35 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 24 12:15:18 2017 +0100

----------------------------------------------------------------------
 .../flink/util/DynamicCodeLoadingException.java | 61 ++++++++++++++++++++
 .../org/apache/flink/util/FlinkException.java   | 58 +++++++++++++++++++
 .../flink/util/FlinkRuntimeException.java       | 58 +++++++++++++++++++
 3 files changed, 177 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/813c2585/flink-core/src/main/java/org/apache/flink/util/DynamicCodeLoadingException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/DynamicCodeLoadingException.java b/flink-core/src/main/java/org/apache/flink/util/DynamicCodeLoadingException.java
new file mode 100644
index 0000000..d18b9d3
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/DynamicCodeLoadingException.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.annotation.Public;
+
+/**
+ * An exception that is thrown if the dynamic instantiation of code fails.
+ * 
+ * <p>This exception is supposed to "sum up" the zoo of exceptions typically thrown around
+ * dynamic code loading and instantiations:
+ * 
+ * <pre>{@code
+ * try {
+ *     Class.forName(classname).asSubclass(TheType.class).newInstance();
+ * }
+ * catch (ClassNotFoundException | ClassCastException | InstantiationException | IllegalAccessException e) {
+ *     throw new DynamicCodeLoadingException("Could not load and instantiate " + classname", e);
+ * }
+ * }</pre>
+ */
+@Public
+public class DynamicCodeLoadingException extends FlinkException {
+
+	private static final long serialVersionUID = -25138443817255490L;
+
+	/**
+	 * Creates a new exception with the given cause.
+	 *
+	 * @param cause The exception that caused this exception
+	 */
+	public DynamicCodeLoadingException(Throwable cause) {
+		super(cause);
+	}
+
+	/**
+	 * Creates a new exception with the given message and cause.
+	 *
+	 * @param message The exception message
+	 * @param cause The exception that caused this exception
+	 */
+	public DynamicCodeLoadingException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/813c2585/flink-core/src/main/java/org/apache/flink/util/FlinkException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/FlinkException.java b/flink-core/src/main/java/org/apache/flink/util/FlinkException.java
new file mode 100644
index 0000000..550ab2c
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/FlinkException.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.annotation.Public;
+
+/**
+ * Base class of all Flink-specific checked exceptions.
+ */
+@Public
+public class FlinkException extends Exception {
+
+	private static final long serialVersionUID = 450688772469004724L;
+
+	/**
+	 * Creates a new Exception with the given message and null as the cause.
+	 * 
+	 * @param message The exception message
+	 */
+	public FlinkException(String message) {
+		super(message);
+	}
+
+	/**
+	 * Creates a new exception with a null message and the given cause.
+	 * 
+	 * @param cause The exception that caused this exception
+	 */
+	public FlinkException(Throwable cause) {
+		super(cause);
+	}
+
+	/**
+	 * Creates a new exception with the given message and cause
+	 * 
+	 * @param message The exception message
+	 * @param cause The exception that caused this exception
+	 */
+	public FlinkException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/813c2585/flink-core/src/main/java/org/apache/flink/util/FlinkRuntimeException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/FlinkRuntimeException.java b/flink-core/src/main/java/org/apache/flink/util/FlinkRuntimeException.java
new file mode 100644
index 0000000..16b783b
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/FlinkRuntimeException.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.annotation.Public;
+
+/**
+ * Base class of all Flink-specific unchecked exceptions.
+ */
+@Public
+public class FlinkRuntimeException extends RuntimeException {
+
+	private static final long serialVersionUID = 193141189399279147L;
+
+	/**
+	 * Creates a new Exception with the given message and null as the cause.
+	 * 
+	 * @param message The exception message
+	 */
+	public FlinkRuntimeException(String message) {
+		super(message);
+	}
+
+	/**
+	 * Creates a new exception with a null message and the given cause.
+	 * 
+	 * @param cause The exception that caused this exception
+	 */
+	public FlinkRuntimeException(Throwable cause) {
+		super(cause);
+	}
+
+	/**
+	 * Creates a new exception with the given message and cause
+	 * 
+	 * @param message The exception message
+	 * @param cause The exception that caused this exception
+	 */
+	public FlinkRuntimeException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}


[4/4] flink git commit: [FLINK-5895] [runtime] Decrease logging aggressiveness of FileSystemSafetyNet

Posted by se...@apache.org.
[FLINK-5895] [runtime] Decrease logging aggressiveness of FileSystemSafetyNet


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f6e6e7ec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f6e6e7ec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f6e6e7ec

Branch: refs/heads/master
Commit: f6e6e7ecf4d287f76698302417a9ff2ffc869477
Parents: 15ae922
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Feb 23 16:21:46 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 24 12:15:18 2017 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/core/fs/FileSystemSafetyNet.java    | 7 -------
 .../main/java/org/apache/flink/runtime/taskmanager/Task.java  | 7 +++++++
 2 files changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f6e6e7ec/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
index b18cb13..eb28504 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
@@ -21,9 +21,6 @@ package org.apache.flink.core.fs;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.util.IOUtils;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.net.URI;
 
 import static org.apache.flink.util.Preconditions.checkState;
@@ -65,8 +62,6 @@ import static org.apache.flink.util.Preconditions.checkState;
 @Internal
 public class FileSystemSafetyNet {
 
-	private static final Logger LOG = LoggerFactory.getLogger(FileSystemSafetyNet.class);
-
 	/** The map from thread to the safety net registry for that thread */
 	private static final ThreadLocal<SafetyNetCloseableRegistry> REGISTRIES = new ThreadLocal<>();
 
@@ -93,7 +88,6 @@ public class FileSystemSafetyNet {
 
 		SafetyNetCloseableRegistry newRegistry = new SafetyNetCloseableRegistry();
 		REGISTRIES.set(newRegistry);
-		LOG.info("Created new CloseableRegistry {} for {}", newRegistry, Thread.currentThread().getName());
 	}
 
 	/**
@@ -107,7 +101,6 @@ public class FileSystemSafetyNet {
 	public static void closeSafetyNetAndGuardedResourcesForThread() {
 		SafetyNetCloseableRegistry registry = REGISTRIES.get();
 		if (null != registry) {
-			LOG.info("Ensuring all FileSystem streams are closed for {}", Thread.currentThread().getName());
 			REGISTRIES.remove();
 			IOUtils.closeQuietly(registry);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/f6e6e7ec/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index c9f17b8..8732c60 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -553,6 +553,7 @@ public class Task implements Runnable, TaskActions {
 			// ----------------------------
 
 			// activate safety net for task thread
+			LOG.info("Creating FileSystem stream leak safety net for task {}", this);
 			FileSystemSafetyNet.initializeSafetyNetForThread();
 
 			// first of all, get a user-code classloader
@@ -792,6 +793,7 @@ public class Task implements Runnable, TaskActions {
 				removeCachedFiles(distributedCacheEntries, fileCache);
 
 				// close and de-activate safety net for task thread
+				LOG.info("Ensuring all FileSystem streams are closed for task {}", this); 
 				FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
 
 				notifyFinalState();
@@ -1138,7 +1140,9 @@ public class Task implements Runnable, TaskActions {
 					@Override
 					public void run() {
 						// activate safety net for checkpointing thread
+						LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
 						FileSystemSafetyNet.initializeSafetyNetForThread();
+
 						try {
 							boolean success = statefulTask.triggerCheckpoint(checkpointMetaData, checkpointOptions);
 							if (!success) {
@@ -1159,6 +1163,9 @@ public class Task implements Runnable, TaskActions {
 							}
 						} finally {
 							// close and de-activate safety net for checkpointing thread
+							LOG.debug("Ensuring all FileSystem streams are closed for {}",
+									Thread.currentThread().getName());
+
 							FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
 						}
 					}