You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/01/05 14:42:41 UTC
[1/5] flink git commit: [FLINK-5349] [docs] Fix typos in Twitter
connector example This closes #3015.
Repository: flink
Updated Branches:
refs/heads/release-1.2 a6a59999b -> 9c0c19aae
[FLINK-5349] [docs] Fix typos in Twitter connector example
This closes #3015.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/335175e6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/335175e6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/335175e6
Branch: refs/heads/release-1.2
Commit: 335175e6eefc260cf1600544639594d85836f7d8
Parents: a6a5999
Author: Ivan Mushketyk <iv...@gmail.com>
Authored: Fri Dec 16 07:56:46 2016 +0000
Committer: zentol <ch...@apache.org>
Committed: Thu Jan 5 14:00:52 2017 +0100
----------------------------------------------------------------------
docs/dev/connectors/twitter.md | 16 ++++++++--------
1 file changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/335175e6/docs/dev/connectors/twitter.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/twitter.md b/docs/dev/connectors/twitter.md
index 0ccbbff..9b6a019 100644
--- a/docs/dev/connectors/twitter.md
+++ b/docs/dev/connectors/twitter.md
@@ -58,20 +58,20 @@ In contrast to other connectors, the `TwitterSource` depends on no additional se
<div data-lang="java" markdown="1">
{% highlight java %}
Properties props = new Properties();
-p.setProperty(TwitterSource.CONSUMER_KEY, "");
-p.setProperty(TwitterSource.CONSUMER_SECRET, "");
-p.setProperty(TwitterSource.TOKEN, "");
-p.setProperty(TwitterSource.TOKEN_SECRET, "");
+props.setProperty(TwitterSource.CONSUMER_KEY, "");
+props.setProperty(TwitterSource.CONSUMER_SECRET, "");
+props.setProperty(TwitterSource.TOKEN, "");
+props.setProperty(TwitterSource.TOKEN_SECRET, "");
DataStream<String> streamSource = env.addSource(new TwitterSource(props));
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
val props = new Properties();
-p.setProperty(TwitterSource.CONSUMER_KEY, "");
-p.setProperty(TwitterSource.CONSUMER_SECRET, "");
-p.setProperty(TwitterSource.TOKEN, "");
-p.setProperty(TwitterSource.TOKEN_SECRET, "");
+props.setProperty(TwitterSource.CONSUMER_KEY, "");
+props.setProperty(TwitterSource.CONSUMER_SECRET, "");
+props.setProperty(TwitterSource.TOKEN, "");
+props.setProperty(TwitterSource.TOKEN_SECRET, "");
DataStream<String> streamSource = env.addSource(new TwitterSource(props));
{% endhighlight %}
</div>
[5/5] flink git commit: [FLINK-5160] Fix
SecurityContextTest#testCreateInsecureHadoopContext on Windows
Posted by ch...@apache.org.
[FLINK-5160] Fix SecurityContextTest#testCreateInsecureHadoopContext on Windows
This closes #2888.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9c0c19aa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9c0c19aa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9c0c19aa
Branch: refs/heads/release-1.2
Commit: 9c0c19aae5b78de71c91a735a76cd9196dc8482c
Parents: b50bbcc
Author: zentol <ch...@apache.org>
Authored: Fri Nov 25 12:51:38 2016 +0100
Committer: zentol <ch...@apache.org>
Committed: Thu Jan 5 14:01:10 2017 +0100
----------------------------------------------------------------------
.../org/apache/flink/runtime/security/SecurityUtilsTest.java | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9c0c19aa/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
index 1d38899..e7da404 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
@@ -79,20 +79,24 @@ public class SecurityUtilsTest {
String userName = "";
String osName = System.getProperty( "os.name" ).toLowerCase();
String className = null;
+ String methodName = null;
if( osName.contains( "windows" ) ){
className = "com.sun.security.auth.module.NTSystem";
+ methodName = "getName";
}
else if( osName.contains( "linux" ) || osName.contains( "mac" ) ){
className = "com.sun.security.auth.module.UnixSystem";
+ methodName = "getUsername";
}
else if( osName.contains( "solaris" ) || osName.contains( "sunos" ) ){
className = "com.sun.security.auth.module.SolarisSystem";
+ methodName = "getUsername";
}
if( className != null ){
Class<?> c = Class.forName( className );
- Method method = c.getDeclaredMethod( "getUsername" );
+ Method method = c.getDeclaredMethod( methodName );
Object o = c.newInstance();
userName = (String) method.invoke( o );
}
[4/5] flink git commit: [FLINK-4255] Unstable test
WebRuntimeMonitorITCase.testNoEscape
Posted by ch...@apache.org.
[FLINK-4255] Unstable test WebRuntimeMonitorITCase.testNoEscape
This closes #3019.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b50bbcc8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b50bbcc8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b50bbcc8
Branch: refs/heads/release-1.2
Commit: b50bbcc8853c1c2ebcdba9c74a70bfdfbe6557ab
Parents: 24109cb
Author: Boris Osipov <bo...@epam.com>
Authored: Fri Dec 16 10:30:33 2016 +0300
Committer: zentol <ch...@apache.org>
Committed: Thu Jan 5 14:01:05 2017 +0100
----------------------------------------------------------------------
.../webmonitor/WebRuntimeMonitorITCase.java | 86 ++++++++------------
1 file changed, 32 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b50bbcc8/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 853ef14..d8bd6af 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -82,25 +82,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
// Flink w/o a web monitor
flink = new TestingCluster(new Configuration());
flink.start(true);
-
- ActorSystem jmActorSystem = flink.jobManagerActorSystems().get().head();
- ActorRef jmActor = flink.jobManagerActors().get().head();
-
- File logDir = temporaryFolder.newFolder("log");
- Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath());
- Files.createFile(new File(logDir, "jobmanager.out").toPath());
-
- Configuration monitorConfig = new Configuration();
- monitorConfig.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
- monitorConfig.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
-
- // Needs to match the leader address from the leader retrieval service
- String jobManagerAddress = AkkaUtils.getAkkaURL(jmActorSystem, jmActor);
-
- webMonitor = new WebRuntimeMonitor(monitorConfig, flink.createLeaderRetrievalService(),
- jmActorSystem);
-
- webMonitor.start(jobManagerAddress);
+ webMonitor = startWebRuntimeMonitor(flink);
try (HttpTestClient client = new HttpTestClient("localhost", webMonitor.getServerPort())) {
String expected = new Scanner(new File(MAIN_RESOURCES_PATH + "/index.html"))
@@ -228,7 +210,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
String expected = new Scanner(new File(MAIN_RESOURCES_PATH + "/index.html"))
.useDelimiter("\\A").next();
- // Request the file from the leaading web server
+ // Request the file from the leading web server
leaderClient.sendGetRequest("index.html", deadline.timeLeft());
HttpTestClient.SimpleHttpResponse response = leaderClient.getNextResponse(deadline.timeLeft());
@@ -352,23 +334,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
try {
flink = new TestingCluster(new Configuration());
flink.start(true);
-
- ActorSystem jmActorSystem = flink.jobManagerActorSystems().get().head();
- ActorRef jmActor = flink.jobManagerActors().get().head();
-
- // Needs to match the leader address from the leader retrieval service
- String jobManagerAddress = AkkaUtils.getAkkaURL(jmActorSystem, jmActor);
-
- // Web frontend on random port
- Configuration config = new Configuration();
- config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
-
- webMonitor = new WebRuntimeMonitor(
- config,
- flink.createLeaderRetrievalService(),
- jmActorSystem);
-
- webMonitor.start(jobManagerAddress);
+ webMonitor = startWebRuntimeMonitor(flink);
try (HttpTestClient client = new HttpTestClient("localhost", webMonitor.getServerPort())) {
String expectedIndex = new Scanner(new File(MAIN_RESOURCES_PATH + "/index.html"))
@@ -430,23 +396,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
try {
flink = new TestingCluster(new Configuration());
flink.start(true);
-
- ActorSystem jmActorSystem = flink.jobManagerActorSystems().get().head();
- ActorRef jmActor = flink.jobManagerActors().get().head();
-
- // Needs to match the leader address from the leader retrieval service
- String jobManagerAddress = AkkaUtils.getAkkaURL(jmActorSystem, jmActor);
-
- // Web frontend on random port
- Configuration config = new Configuration();
- config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
-
- webMonitor = new WebRuntimeMonitor(
- config,
- flink.createLeaderRetrievalService(),
- jmActorSystem);
-
- webMonitor.start(jobManagerAddress);
+ webMonitor = startWebRuntimeMonitor(flink);
try (HttpTestClient client = new HttpTestClient("localhost", webMonitor.getServerPort())) {
String expectedIndex = new Scanner(new File(MAIN_RESOURCES_PATH + "/index.html"))
@@ -491,6 +441,34 @@ public class WebRuntimeMonitorITCase extends TestLogger {
}
}
+ private WebRuntimeMonitor startWebRuntimeMonitor(
+ TestingCluster flink) throws Exception {
+
+ ActorSystem jmActorSystem = flink.jobManagerActorSystems().get().head();
+ ActorRef jmActor = flink.jobManagerActors().get().head();
+
+ // Needs to match the leader address from the leader retrieval service
+ String jobManagerAddress = AkkaUtils.getAkkaURL(jmActorSystem, jmActor);
+
+ File logDir = temporaryFolder.newFolder("log");
+ Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath());
+ Files.createFile(new File(logDir, "jobmanager.out").toPath());
+
+ // Web frontend on random port
+ Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
+ config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
+
+ WebRuntimeMonitor webMonitor = new WebRuntimeMonitor(
+ config,
+ flink.createLeaderRetrievalService(),
+ jmActorSystem);
+
+ webMonitor.start(jobManagerAddress);
+ flink.waitForActorsToBeAlive();
+ return webMonitor;
+ }
+
// ------------------------------------------------------------------------
private void waitForLeaderNotification(
[3/5] flink git commit: [FLINK-4870] Fix path handling in
ContinuousFileMonitoringFunction
Posted by ch...@apache.org.
[FLINK-4870] Fix path handling in ContinuousFileMonitoringFunction
This closes #2887.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24109cb2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24109cb2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24109cb2
Branch: refs/heads/release-1.2
Commit: 24109cb2692f1f0dd2b9f8c9c8dcc02e55148bab
Parents: bb46fff
Author: zentol <ch...@apache.org>
Authored: Fri Nov 25 13:27:43 2016 +0100
Committer: zentol <ch...@apache.org>
Committed: Thu Jan 5 14:01:01 2017 +0100
----------------------------------------------------------------------
.../api/functions/source/ContinuousFileMonitoringFunction.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/24109cb2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
index 1ec9a70..e0a042a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
@@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -187,8 +186,9 @@ public class ContinuousFileMonitoringFunction<OUT>
@Override
public void run(SourceFunction.SourceContext<TimestampedFileInputSplit> context) throws Exception {
- FileSystem fileSystem = FileSystem.get(new URI(path));
- if (!fileSystem.exists(new Path(path))) {
+ Path p = new Path(path);
+ FileSystem fileSystem = FileSystem.get(p.toUri());
+ if (!fileSystem.exists(p)) {
throw new FileNotFoundException("The provided file path " + path + " does not exist.");
}
[2/5] flink git commit: [FLINK-5323] [docs] Replace
CheckpointNotifier with CheckpointListener
Posted by ch...@apache.org.
[FLINK-5323] [docs] Replace CheckpointNotifier with CheckpointListener
THis closes #3006.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bb46fffe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bb46fffe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bb46fffe
Branch: refs/heads/release-1.2
Commit: bb46fffe310f9cd6f667293df14d98e90011d591
Parents: 335175e
Author: Abhishek R. Singh <ab...@tetrationanalytics.com>
Authored: Wed Dec 14 06:05:11 2016 -0800
Committer: zentol <ch...@apache.org>
Committed: Thu Jan 5 14:00:56 2017 +0100
----------------------------------------------------------------------
docs/dev/state.md | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bb46fffe/docs/dev/state.md
----------------------------------------------------------------------
diff --git a/docs/dev/state.md b/docs/dev/state.md
index a772a03..99f9055 100644
--- a/docs/dev/state.md
+++ b/docs/dev/state.md
@@ -241,7 +241,7 @@ Instance fields can be checkpointed by using the `Checkpointed` interface.
When the user-defined function implements the `Checkpointed` interface, the `snapshotState(\u2026)` and `restoreState(\u2026)`
methods will be executed to draw and restore function state.
-In addition to that, user functions can also implement the `CheckpointNotifier` interface to receive notifications on
+In addition to that, user functions can also implement the `CheckpointListener` interface to receive notifications on
completed checkpoints via the `notifyCheckpointComplete(long checkpointId)` method.
Note that there is no guarantee for the user function to receive a notification if a failure happens between
checkpoint completion and notification. The notifications should hence be treated in a way that notifications from
@@ -346,7 +346,7 @@ public static class CounterSource
}
{% endhighlight %}
-Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `flink.streaming.api.checkpoint.CheckpointNotifier` interface.
+Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface.
## State Checkpoints in Iterative Jobs