You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/24 11:28:29 UTC
[1/4] flink git commit: [FLINK-5885] [docs] Fix Cassandra Scala
snippet
Repository: flink
Updated Branches:
refs/heads/master 15ae922ad -> 6f3723e83
[FLINK-5885] [docs] Fix Cassandra Scala snippet
This closes #3400
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6f3723e8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6f3723e8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6f3723e8
Branch: refs/heads/master
Commit: 6f3723e83ff7f2b862a3f57cebbc78060937a2b8
Parents: 813c258
Author: Andrea Sella <an...@radicalbit.io>
Authored: Thu Feb 23 10:53:56 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 24 12:15:18 2017 +0100
----------------------------------------------------------------------
docs/dev/connectors/cassandra.md | 7 +++----
1 file changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6f3723e8/docs/dev/connectors/cassandra.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/cassandra.md b/docs/dev/connectors/cassandra.md
index 7f76b72..c897779 100644
--- a/docs/dev/connectors/cassandra.md
+++ b/docs/dev/connectors/cassandra.md
@@ -100,12 +100,11 @@ CassandraSink.addSink(input)
CassandraSink.addSink(input)
.setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
.setClusterBuilder(new ClusterBuilder() {
- @Override
- public Cluster buildCluster(Cluster.Builder builder) {
- return builder.addContactPoint("127.0.0.1").build();
+ override def buildCluster(builder: Cluster.Builder): Cluster = {
+ builder.addContactPoint("127.0.0.1").build()
}
})
- .build();
+ .build()
{% endhighlight %}
</div>
</div>
[2/4] flink git commit: [hotfix] [tests] Remove sysout logging in
KvStateLocationTest
Posted by se...@apache.org.
[hotfix] [tests] Remove sysout logging in KvStateLocationTest
This helps keeping test log output free from clutter.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/31f3d65c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/31f3d65c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/31f3d65c
Branch: refs/heads/master
Commit: 31f3d65c59034a9c5f40a7de34c0219792507327
Parents: f6e6e7e
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Feb 23 16:51:14 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 24 12:15:18 2017 +0100
----------------------------------------------------------------------
.../java/org/apache/flink/runtime/query/KvStateLocationTest.java | 2 --
1 file changed, 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/31f3d65c/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java
index ed51f62..cd5c6d5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java
@@ -56,8 +56,6 @@ public class KvStateLocationTest {
start = end + 1;
}
- System.out.println(keyGroupRanges);
-
String registrationName = "asdasdasdasd";
KvStateLocation location = new KvStateLocation(jobId, jobVertexId, numKeyGroups, registrationName);
[3/4] flink git commit: [FLINK-5854] [core] Add base Flink Exception
classes
Posted by se...@apache.org.
[FLINK-5854] [core] Add base Flink Exception classes
This closes #3368
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/813c2585
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/813c2585
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/813c2585
Branch: refs/heads/master
Commit: 813c2585a49c673b71678463d719b6a85b778994
Parents: 31f3d65
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Feb 17 16:24:35 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 24 12:15:18 2017 +0100
----------------------------------------------------------------------
.../flink/util/DynamicCodeLoadingException.java | 61 ++++++++++++++++++++
.../org/apache/flink/util/FlinkException.java | 58 +++++++++++++++++++
.../flink/util/FlinkRuntimeException.java | 58 +++++++++++++++++++
3 files changed, 177 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/813c2585/flink-core/src/main/java/org/apache/flink/util/DynamicCodeLoadingException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/DynamicCodeLoadingException.java b/flink-core/src/main/java/org/apache/flink/util/DynamicCodeLoadingException.java
new file mode 100644
index 0000000..d18b9d3
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/DynamicCodeLoadingException.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.annotation.Public;
+
+/**
+ * An exception that is thrown if the dynamic instantiation of code fails.
+ *
+ * <p>This exception is supposed to "sum up" the zoo of exceptions typically thrown around
+ * dynamic code loading and instantiations:
+ *
+ * <pre>{@code
+ * try {
+ * Class.forName(classname).asSubclass(TheType.class).newInstance();
+ * }
+ * catch (ClassNotFoundException | ClassCastException | InstantiationException | IllegalAccessException e) {
+ * throw new DynamicCodeLoadingException("Could not load and instantiate " + classname", e);
+ * }
+ * }</pre>
+ */
+@Public
+public class DynamicCodeLoadingException extends FlinkException {
+
+ private static final long serialVersionUID = -25138443817255490L;
+
+ /**
+ * Creates a new exception with the given cause.
+ *
+ * @param cause The exception that caused this exception
+ */
+ public DynamicCodeLoadingException(Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * Creates a new exception with the given message and cause.
+ *
+ * @param message The exception message
+ * @param cause The exception that caused this exception
+ */
+ public DynamicCodeLoadingException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/813c2585/flink-core/src/main/java/org/apache/flink/util/FlinkException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/FlinkException.java b/flink-core/src/main/java/org/apache/flink/util/FlinkException.java
new file mode 100644
index 0000000..550ab2c
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/FlinkException.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.annotation.Public;
+
+/**
+ * Base class of all Flink-specific checked exceptions.
+ */
+@Public
+public class FlinkException extends Exception {
+
+ private static final long serialVersionUID = 450688772469004724L;
+
+ /**
+ * Creates a new Exception with the given message and null as the cause.
+ *
+ * @param message The exception message
+ */
+ public FlinkException(String message) {
+ super(message);
+ }
+
+ /**
+ * Creates a new exception with a null message and the given cause.
+ *
+ * @param cause The exception that caused this exception
+ */
+ public FlinkException(Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * Creates a new exception with the given message and cause
+ *
+ * @param message The exception message
+ * @param cause The exception that caused this exception
+ */
+ public FlinkException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/813c2585/flink-core/src/main/java/org/apache/flink/util/FlinkRuntimeException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/FlinkRuntimeException.java b/flink-core/src/main/java/org/apache/flink/util/FlinkRuntimeException.java
new file mode 100644
index 0000000..16b783b
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/FlinkRuntimeException.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.annotation.Public;
+
+/**
+ * Base class of all Flink-specific unchecked exceptions.
+ */
+@Public
+public class FlinkRuntimeException extends RuntimeException {
+
+ private static final long serialVersionUID = 193141189399279147L;
+
+ /**
+ * Creates a new Exception with the given message and null as the cause.
+ *
+ * @param message The exception message
+ */
+ public FlinkRuntimeException(String message) {
+ super(message);
+ }
+
+ /**
+ * Creates a new exception with a null message and the given cause.
+ *
+ * @param cause The exception that caused this exception
+ */
+ public FlinkRuntimeException(Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * Creates a new exception with the given message and cause
+ *
+ * @param message The exception message
+ * @param cause The exception that caused this exception
+ */
+ public FlinkRuntimeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
[4/4] flink git commit: [FLINK-5895] [runtime] Decrease logging
aggressiveness of FileSystemSafetyNet
Posted by se...@apache.org.
[FLINK-5895] [runtime] Decrease logging aggressiveness of FileSystemSafetyNet
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f6e6e7ec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f6e6e7ec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f6e6e7ec
Branch: refs/heads/master
Commit: f6e6e7ecf4d287f76698302417a9ff2ffc869477
Parents: 15ae922
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Feb 23 16:21:46 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 24 12:15:18 2017 +0100
----------------------------------------------------------------------
.../java/org/apache/flink/core/fs/FileSystemSafetyNet.java | 7 -------
.../main/java/org/apache/flink/runtime/taskmanager/Task.java | 7 +++++++
2 files changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f6e6e7ec/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
index b18cb13..eb28504 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
@@ -21,9 +21,6 @@ package org.apache.flink.core.fs;
import org.apache.flink.annotation.Internal;
import org.apache.flink.util.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.net.URI;
import static org.apache.flink.util.Preconditions.checkState;
@@ -65,8 +62,6 @@ import static org.apache.flink.util.Preconditions.checkState;
@Internal
public class FileSystemSafetyNet {
- private static final Logger LOG = LoggerFactory.getLogger(FileSystemSafetyNet.class);
-
/** The map from thread to the safety net registry for that thread */
private static final ThreadLocal<SafetyNetCloseableRegistry> REGISTRIES = new ThreadLocal<>();
@@ -93,7 +88,6 @@ public class FileSystemSafetyNet {
SafetyNetCloseableRegistry newRegistry = new SafetyNetCloseableRegistry();
REGISTRIES.set(newRegistry);
- LOG.info("Created new CloseableRegistry {} for {}", newRegistry, Thread.currentThread().getName());
}
/**
@@ -107,7 +101,6 @@ public class FileSystemSafetyNet {
public static void closeSafetyNetAndGuardedResourcesForThread() {
SafetyNetCloseableRegistry registry = REGISTRIES.get();
if (null != registry) {
- LOG.info("Ensuring all FileSystem streams are closed for {}", Thread.currentThread().getName());
REGISTRIES.remove();
IOUtils.closeQuietly(registry);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f6e6e7ec/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index c9f17b8..8732c60 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -553,6 +553,7 @@ public class Task implements Runnable, TaskActions {
// ----------------------------
// activate safety net for task thread
+ LOG.info("Creating FileSystem stream leak safety net for task {}", this);
FileSystemSafetyNet.initializeSafetyNetForThread();
// first of all, get a user-code classloader
@@ -792,6 +793,7 @@ public class Task implements Runnable, TaskActions {
removeCachedFiles(distributedCacheEntries, fileCache);
// close and de-activate safety net for task thread
+ LOG.info("Ensuring all FileSystem streams are closed for task {}", this);
FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
notifyFinalState();
@@ -1138,7 +1140,9 @@ public class Task implements Runnable, TaskActions {
@Override
public void run() {
// activate safety net for checkpointing thread
+ LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
FileSystemSafetyNet.initializeSafetyNetForThread();
+
try {
boolean success = statefulTask.triggerCheckpoint(checkpointMetaData, checkpointOptions);
if (!success) {
@@ -1159,6 +1163,9 @@ public class Task implements Runnable, TaskActions {
}
} finally {
// close and de-activate safety net for checkpointing thread
+ LOG.debug("Ensuring all FileSystem streams are closed for {}",
+ Thread.currentThread().getName());
+
FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
}
}