You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/01/05 14:42:41 UTC

[1/5] flink git commit: [FLINK-5349] [docs] Fix typos in Twitter connector example This closes #3015.

Repository: flink
Updated Branches:
  refs/heads/release-1.2 a6a59999b -> 9c0c19aae


[FLINK-5349] [docs] Fix typos in Twitter connector example
This closes #3015.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/335175e6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/335175e6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/335175e6

Branch: refs/heads/release-1.2
Commit: 335175e6eefc260cf1600544639594d85836f7d8
Parents: a6a5999
Author: Ivan Mushketyk <iv...@gmail.com>
Authored: Fri Dec 16 07:56:46 2016 +0000
Committer: zentol <ch...@apache.org>
Committed: Thu Jan 5 14:00:52 2017 +0100

----------------------------------------------------------------------
 docs/dev/connectors/twitter.md | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/335175e6/docs/dev/connectors/twitter.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/twitter.md b/docs/dev/connectors/twitter.md
index 0ccbbff..9b6a019 100644
--- a/docs/dev/connectors/twitter.md
+++ b/docs/dev/connectors/twitter.md
@@ -58,20 +58,20 @@ In contrast to other connectors, the `TwitterSource` depends on no additional se
 <div data-lang="java" markdown="1">
 {% highlight java %}
 Properties props = new Properties();
-p.setProperty(TwitterSource.CONSUMER_KEY, "");
-p.setProperty(TwitterSource.CONSUMER_SECRET, "");
-p.setProperty(TwitterSource.TOKEN, "");
-p.setProperty(TwitterSource.TOKEN_SECRET, "");
+props.setProperty(TwitterSource.CONSUMER_KEY, "");
+props.setProperty(TwitterSource.CONSUMER_SECRET, "");
+props.setProperty(TwitterSource.TOKEN, "");
+props.setProperty(TwitterSource.TOKEN_SECRET, "");
 DataStream<String> streamSource = env.addSource(new TwitterSource(props));
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 val props = new Properties();
-p.setProperty(TwitterSource.CONSUMER_KEY, "");
-p.setProperty(TwitterSource.CONSUMER_SECRET, "");
-p.setProperty(TwitterSource.TOKEN, "");
-p.setProperty(TwitterSource.TOKEN_SECRET, "");
+props.setProperty(TwitterSource.CONSUMER_KEY, "");
+props.setProperty(TwitterSource.CONSUMER_SECRET, "");
+props.setProperty(TwitterSource.TOKEN, "");
+props.setProperty(TwitterSource.TOKEN_SECRET, "");
 DataStream<String> streamSource = env.addSource(new TwitterSource(props));
 {% endhighlight %}
 </div>


[5/5] flink git commit: [FLINK-5160] Fix SecurityContextTest#testCreateInsecureHadoopContext on Windows

Posted by ch...@apache.org.
[FLINK-5160] Fix SecurityContextTest#testCreateInsecureHadoopContext on Windows

This closes #2888.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9c0c19aa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9c0c19aa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9c0c19aa

Branch: refs/heads/release-1.2
Commit: 9c0c19aae5b78de71c91a735a76cd9196dc8482c
Parents: b50bbcc
Author: zentol <ch...@apache.org>
Authored: Fri Nov 25 12:51:38 2016 +0100
Committer: zentol <ch...@apache.org>
Committed: Thu Jan 5 14:01:10 2017 +0100

----------------------------------------------------------------------
 .../org/apache/flink/runtime/security/SecurityUtilsTest.java   | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9c0c19aa/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
index 1d38899..e7da404 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
@@ -79,20 +79,24 @@ public class SecurityUtilsTest {
 		String userName = "";
 		String osName = System.getProperty( "os.name" ).toLowerCase();
 		String className = null;
+		String methodName = null;
 
 		if( osName.contains( "windows" ) ){
 			className = "com.sun.security.auth.module.NTSystem";
+			methodName = "getName";
 		}
 		else if( osName.contains( "linux" ) || osName.contains( "mac" )  ){
 			className = "com.sun.security.auth.module.UnixSystem";
+			methodName = "getUsername";
 		}
 		else if( osName.contains( "solaris" ) || osName.contains( "sunos" ) ){
 			className = "com.sun.security.auth.module.SolarisSystem";
+			methodName = "getUsername";
 		}
 
 		if( className != null ){
 			Class<?> c = Class.forName( className );
-			Method method = c.getDeclaredMethod( "getUsername" );
+			Method method = c.getDeclaredMethod( methodName );
 			Object o = c.newInstance();
 			userName = (String) method.invoke( o );
 		}


[4/5] flink git commit: [FLINK-4255] Unstable test WebRuntimeMonitorITCase.testNoEscape

Posted by ch...@apache.org.
[FLINK-4255] Unstable test WebRuntimeMonitorITCase.testNoEscape

This closes #3019.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b50bbcc8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b50bbcc8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b50bbcc8

Branch: refs/heads/release-1.2
Commit: b50bbcc8853c1c2ebcdba9c74a70bfdfbe6557ab
Parents: 24109cb
Author: Boris Osipov <bo...@epam.com>
Authored: Fri Dec 16 10:30:33 2016 +0300
Committer: zentol <ch...@apache.org>
Committed: Thu Jan 5 14:01:05 2017 +0100

----------------------------------------------------------------------
 .../webmonitor/WebRuntimeMonitorITCase.java     | 86 ++++++++------------
 1 file changed, 32 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b50bbcc8/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 853ef14..d8bd6af 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -82,25 +82,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 			// Flink w/o a web monitor
 			flink = new TestingCluster(new Configuration());
 			flink.start(true);
-
-			ActorSystem jmActorSystem = flink.jobManagerActorSystems().get().head();
-			ActorRef jmActor = flink.jobManagerActors().get().head();
-
-			File logDir = temporaryFolder.newFolder("log");
-			Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath());
-			Files.createFile(new File(logDir, "jobmanager.out").toPath());
-
-			Configuration monitorConfig = new Configuration();
-			monitorConfig.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
-			monitorConfig.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
-
-			// Needs to match the leader address from the leader retrieval service
-			String jobManagerAddress = AkkaUtils.getAkkaURL(jmActorSystem, jmActor);
-
-			webMonitor = new WebRuntimeMonitor(monitorConfig, flink.createLeaderRetrievalService(),
-					jmActorSystem);
-
-			webMonitor.start(jobManagerAddress);
+			webMonitor = startWebRuntimeMonitor(flink);
 
 			try (HttpTestClient client = new HttpTestClient("localhost", webMonitor.getServerPort())) {
 				String expected = new Scanner(new File(MAIN_RESOURCES_PATH + "/index.html"))
@@ -228,7 +210,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 				String expected = new Scanner(new File(MAIN_RESOURCES_PATH + "/index.html"))
 						.useDelimiter("\\A").next();
 
-				// Request the file from the leaading web server
+				// Request the file from the leading web server
 				leaderClient.sendGetRequest("index.html", deadline.timeLeft());
 
 				HttpTestClient.SimpleHttpResponse response = leaderClient.getNextResponse(deadline.timeLeft());
@@ -352,23 +334,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 		try {
 			flink = new TestingCluster(new Configuration());
 			flink.start(true);
-
-			ActorSystem jmActorSystem = flink.jobManagerActorSystems().get().head();
-			ActorRef jmActor = flink.jobManagerActors().get().head();
-
-			// Needs to match the leader address from the leader retrieval service
-			String jobManagerAddress = AkkaUtils.getAkkaURL(jmActorSystem, jmActor);
-
-			// Web frontend on random port
-			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
-
-			webMonitor = new WebRuntimeMonitor(
-					config,
-					flink.createLeaderRetrievalService(),
-					jmActorSystem);
-
-			webMonitor.start(jobManagerAddress);
+			webMonitor = startWebRuntimeMonitor(flink);
 
 			try (HttpTestClient client = new HttpTestClient("localhost", webMonitor.getServerPort())) {
 				String expectedIndex = new Scanner(new File(MAIN_RESOURCES_PATH + "/index.html"))
@@ -430,23 +396,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 		try {
 			flink = new TestingCluster(new Configuration());
 			flink.start(true);
-
-			ActorSystem jmActorSystem = flink.jobManagerActorSystems().get().head();
-			ActorRef jmActor = flink.jobManagerActors().get().head();
-
-			// Needs to match the leader address from the leader retrieval service
-			String jobManagerAddress = AkkaUtils.getAkkaURL(jmActorSystem, jmActor);
-
-			// Web frontend on random port
-			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
-
-			webMonitor = new WebRuntimeMonitor(
-					config,
-					flink.createLeaderRetrievalService(),
-					jmActorSystem);
-
-			webMonitor.start(jobManagerAddress);
+			webMonitor = startWebRuntimeMonitor(flink);
 
 			try (HttpTestClient client = new HttpTestClient("localhost", webMonitor.getServerPort())) {
 				String expectedIndex = new Scanner(new File(MAIN_RESOURCES_PATH + "/index.html"))
@@ -491,6 +441,34 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 		}
 	}
 
+	private WebRuntimeMonitor startWebRuntimeMonitor(
+		TestingCluster flink) throws Exception {
+
+		ActorSystem jmActorSystem = flink.jobManagerActorSystems().get().head();
+		ActorRef jmActor = flink.jobManagerActors().get().head();
+
+		// Needs to match the leader address from the leader retrieval service
+		String jobManagerAddress = AkkaUtils.getAkkaURL(jmActorSystem, jmActor);
+
+		File logDir = temporaryFolder.newFolder("log");
+		Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath());
+		Files.createFile(new File(logDir, "jobmanager.out").toPath());
+
+		// Web frontend on random port
+		Configuration config = new Configuration();
+		config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
+		config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
+
+		WebRuntimeMonitor webMonitor = new WebRuntimeMonitor(
+			config,
+			flink.createLeaderRetrievalService(),
+			jmActorSystem);
+
+		webMonitor.start(jobManagerAddress);
+		flink.waitForActorsToBeAlive();
+		return webMonitor;
+	}
+
 	// ------------------------------------------------------------------------
 
 	private void waitForLeaderNotification(


[3/5] flink git commit: [FLINK-4870] Fix path handling in ContinuousFileMonitoringFunction

Posted by ch...@apache.org.
[FLINK-4870] Fix path handling in ContinuousFileMonitoringFunction

This closes #2887.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24109cb2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24109cb2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24109cb2

Branch: refs/heads/release-1.2
Commit: 24109cb2692f1f0dd2b9f8c9c8dcc02e55148bab
Parents: bb46fff
Author: zentol <ch...@apache.org>
Authored: Fri Nov 25 13:27:43 2016 +0100
Committer: zentol <ch...@apache.org>
Committed: Thu Jan 5 14:01:01 2017 +0100

----------------------------------------------------------------------
 .../api/functions/source/ContinuousFileMonitoringFunction.java | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/24109cb2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
index 1ec9a70..e0a042a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
@@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -187,8 +186,9 @@ public class ContinuousFileMonitoringFunction<OUT>
 
 	@Override
 	public void run(SourceFunction.SourceContext<TimestampedFileInputSplit> context) throws Exception {
-		FileSystem fileSystem = FileSystem.get(new URI(path));
-		if (!fileSystem.exists(new Path(path))) {
+		Path p = new Path(path);
+		FileSystem fileSystem = FileSystem.get(p.toUri());
+		if (!fileSystem.exists(p)) {
 			throw new FileNotFoundException("The provided file path " + path + " does not exist.");
 		}
 


[2/5] flink git commit: [FLINK-5323] [docs] Replace CheckpointNotifier with CheckpointListener

Posted by ch...@apache.org.
[FLINK-5323] [docs] Replace CheckpointNotifier with CheckpointListener

THis closes #3006.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bb46fffe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bb46fffe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bb46fffe

Branch: refs/heads/release-1.2
Commit: bb46fffe310f9cd6f667293df14d98e90011d591
Parents: 335175e
Author: Abhishek R. Singh <ab...@tetrationanalytics.com>
Authored: Wed Dec 14 06:05:11 2016 -0800
Committer: zentol <ch...@apache.org>
Committed: Thu Jan 5 14:00:56 2017 +0100

----------------------------------------------------------------------
 docs/dev/state.md | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bb46fffe/docs/dev/state.md
----------------------------------------------------------------------
diff --git a/docs/dev/state.md b/docs/dev/state.md
index a772a03..99f9055 100644
--- a/docs/dev/state.md
+++ b/docs/dev/state.md
@@ -241,7 +241,7 @@ Instance fields can be checkpointed by using the `Checkpointed` interface.
 When the user-defined function implements the `Checkpointed` interface, the `snapshotState(\u2026)` and `restoreState(\u2026)`
 methods will be executed to draw and restore function state.
 
-In addition to that, user functions can also implement the `CheckpointNotifier` interface to receive notifications on
+In addition to that, user functions can also implement the `CheckpointListener` interface to receive notifications on
 completed checkpoints via the `notifyCheckpointComplete(long checkpointId)` method.
 Note that there is no guarantee for the user function to receive a notification if a failure happens between
 checkpoint completion and notification. The notifications should hence be treated in a way that notifications from
@@ -346,7 +346,7 @@ public static class CounterSource
 }
 {% endhighlight %}
 
-Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `flink.streaming.api.checkpoint.CheckpointNotifier` interface.
+Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface.
 
 ## State Checkpoints in Iterative Jobs