You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/01/05 12:55:32 UTC

[1/5] flink git commit: [FLINK-5160] Fix SecurityContextTest#testCreateInsecureHadoopContext on Windows

Repository: flink
Updated Branches:
  refs/heads/master 3070ff9a6 -> 016d90884


[FLINK-5160] Fix SecurityContextTest#testCreateInsecureHadoopContext on Windows

This closes #2888.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/016d9088
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/016d9088
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/016d9088

Branch: refs/heads/master
Commit: 016d90884cc6c6d3aa52d0b1634cc945ea0f2bf0
Parents: 411fff5
Author: zentol <ch...@apache.org>
Authored: Fri Nov 25 12:51:38 2016 +0100
Committer: zentol <ch...@apache.org>
Committed: Thu Jan 5 12:28:18 2017 +0100

----------------------------------------------------------------------
 .../org/apache/flink/runtime/security/SecurityUtilsTest.java   | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/016d9088/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
index 1d38899..e7da404 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
@@ -79,20 +79,24 @@ public class SecurityUtilsTest {
 		String userName = "";
 		String osName = System.getProperty( "os.name" ).toLowerCase();
 		String className = null;
+		String methodName = null;
 
 		if( osName.contains( "windows" ) ){
 			className = "com.sun.security.auth.module.NTSystem";
+			methodName = "getName";
 		}
 		else if( osName.contains( "linux" ) || osName.contains( "mac" )  ){
 			className = "com.sun.security.auth.module.UnixSystem";
+			methodName = "getUsername";
 		}
 		else if( osName.contains( "solaris" ) || osName.contains( "sunos" ) ){
 			className = "com.sun.security.auth.module.SolarisSystem";
+			methodName = "getUsername";
 		}
 
 		if( className != null ){
 			Class<?> c = Class.forName( className );
-			Method method = c.getDeclaredMethod( "getUsername" );
+			Method method = c.getDeclaredMethod( methodName );
 			Object o = c.newInstance();
 			userName = (String) method.invoke( o );
 		}


[2/5] flink git commit: [FLINK-4870] Fix path handling in ContinuousFileMonitoringFunction

Posted by ch...@apache.org.
[FLINK-4870] Fix path handling in ContinuousFileMonitoringFunction

This closes #2887.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed83b5b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed83b5b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed83b5b8

Branch: refs/heads/master
Commit: ed83b5b89c61cac5d49e690279dab0ea94c35bb3
Parents: 4415dfb
Author: zentol <ch...@apache.org>
Authored: Fri Nov 25 13:27:43 2016 +0100
Committer: zentol <ch...@apache.org>
Committed: Thu Jan 5 12:28:18 2017 +0100

----------------------------------------------------------------------
 .../api/functions/source/ContinuousFileMonitoringFunction.java | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ed83b5b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
index 1ec9a70..e0a042a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
@@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -187,8 +186,9 @@ public class ContinuousFileMonitoringFunction<OUT>
 
 	@Override
 	public void run(SourceFunction.SourceContext<TimestampedFileInputSplit> context) throws Exception {
-		FileSystem fileSystem = FileSystem.get(new URI(path));
-		if (!fileSystem.exists(new Path(path))) {
+		Path p = new Path(path);
+		FileSystem fileSystem = FileSystem.get(p.toUri());
+		if (!fileSystem.exists(p)) {
 			throw new FileNotFoundException("The provided file path " + path + " does not exist.");
 		}
 


[4/5] flink git commit: [FLINK-4255] Unstable test WebRuntimeMonitorITCase.testNoEscape

Posted by ch...@apache.org.
[FLINK-4255] Unstable test WebRuntimeMonitorITCase.testNoEscape

This closes #3019.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/411fff58
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/411fff58
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/411fff58

Branch: refs/heads/master
Commit: 411fff58405804a7f7f79536f8c6885a491dbef6
Parents: ed83b5b
Author: Boris Osipov <bo...@epam.com>
Authored: Fri Dec 16 10:30:33 2016 +0300
Committer: zentol <ch...@apache.org>
Committed: Thu Jan 5 12:28:18 2017 +0100

----------------------------------------------------------------------
 .../webmonitor/WebRuntimeMonitorITCase.java     | 86 ++++++++------------
 1 file changed, 32 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/411fff58/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 853ef14..d8bd6af 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -82,25 +82,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 			// Flink w/o a web monitor
 			flink = new TestingCluster(new Configuration());
 			flink.start(true);
-
-			ActorSystem jmActorSystem = flink.jobManagerActorSystems().get().head();
-			ActorRef jmActor = flink.jobManagerActors().get().head();
-
-			File logDir = temporaryFolder.newFolder("log");
-			Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath());
-			Files.createFile(new File(logDir, "jobmanager.out").toPath());
-
-			Configuration monitorConfig = new Configuration();
-			monitorConfig.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
-			monitorConfig.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
-
-			// Needs to match the leader address from the leader retrieval service
-			String jobManagerAddress = AkkaUtils.getAkkaURL(jmActorSystem, jmActor);
-
-			webMonitor = new WebRuntimeMonitor(monitorConfig, flink.createLeaderRetrievalService(),
-					jmActorSystem);
-
-			webMonitor.start(jobManagerAddress);
+			webMonitor = startWebRuntimeMonitor(flink);
 
 			try (HttpTestClient client = new HttpTestClient("localhost", webMonitor.getServerPort())) {
 				String expected = new Scanner(new File(MAIN_RESOURCES_PATH + "/index.html"))
@@ -228,7 +210,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 				String expected = new Scanner(new File(MAIN_RESOURCES_PATH + "/index.html"))
 						.useDelimiter("\\A").next();
 
-				// Request the file from the leaading web server
+				// Request the file from the leading web server
 				leaderClient.sendGetRequest("index.html", deadline.timeLeft());
 
 				HttpTestClient.SimpleHttpResponse response = leaderClient.getNextResponse(deadline.timeLeft());
@@ -352,23 +334,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 		try {
 			flink = new TestingCluster(new Configuration());
 			flink.start(true);
-
-			ActorSystem jmActorSystem = flink.jobManagerActorSystems().get().head();
-			ActorRef jmActor = flink.jobManagerActors().get().head();
-
-			// Needs to match the leader address from the leader retrieval service
-			String jobManagerAddress = AkkaUtils.getAkkaURL(jmActorSystem, jmActor);
-
-			// Web frontend on random port
-			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
-
-			webMonitor = new WebRuntimeMonitor(
-					config,
-					flink.createLeaderRetrievalService(),
-					jmActorSystem);
-
-			webMonitor.start(jobManagerAddress);
+			webMonitor = startWebRuntimeMonitor(flink);
 
 			try (HttpTestClient client = new HttpTestClient("localhost", webMonitor.getServerPort())) {
 				String expectedIndex = new Scanner(new File(MAIN_RESOURCES_PATH + "/index.html"))
@@ -430,23 +396,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 		try {
 			flink = new TestingCluster(new Configuration());
 			flink.start(true);
-
-			ActorSystem jmActorSystem = flink.jobManagerActorSystems().get().head();
-			ActorRef jmActor = flink.jobManagerActors().get().head();
-
-			// Needs to match the leader address from the leader retrieval service
-			String jobManagerAddress = AkkaUtils.getAkkaURL(jmActorSystem, jmActor);
-
-			// Web frontend on random port
-			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
-
-			webMonitor = new WebRuntimeMonitor(
-					config,
-					flink.createLeaderRetrievalService(),
-					jmActorSystem);
-
-			webMonitor.start(jobManagerAddress);
+			webMonitor = startWebRuntimeMonitor(flink);
 
 			try (HttpTestClient client = new HttpTestClient("localhost", webMonitor.getServerPort())) {
 				String expectedIndex = new Scanner(new File(MAIN_RESOURCES_PATH + "/index.html"))
@@ -491,6 +441,34 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 		}
 	}
 
+	private WebRuntimeMonitor startWebRuntimeMonitor(
+		TestingCluster flink) throws Exception {
+
+		ActorSystem jmActorSystem = flink.jobManagerActorSystems().get().head();
+		ActorRef jmActor = flink.jobManagerActors().get().head();
+
+		// Needs to match the leader address from the leader retrieval service
+		String jobManagerAddress = AkkaUtils.getAkkaURL(jmActorSystem, jmActor);
+
+		File logDir = temporaryFolder.newFolder("log");
+		Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath());
+		Files.createFile(new File(logDir, "jobmanager.out").toPath());
+
+		// Web frontend on random port
+		Configuration config = new Configuration();
+		config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
+		config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
+
+		WebRuntimeMonitor webMonitor = new WebRuntimeMonitor(
+			config,
+			flink.createLeaderRetrievalService(),
+			jmActorSystem);
+
+		webMonitor.start(jobManagerAddress);
+		flink.waitForActorsToBeAlive();
+		return webMonitor;
+	}
+
 	// ------------------------------------------------------------------------
 
 	private void waitForLeaderNotification(


[3/5] flink git commit: [FLINK-5349] [docs] Fix typos in Twitter connector example This closes #3015.

Posted by ch...@apache.org.
[FLINK-5349] [docs] Fix typos in Twitter connector example
This closes #3015.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1f155026
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1f155026
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1f155026

Branch: refs/heads/master
Commit: 1f1550264121ec58aecc872dc9a4deea43b3ecd4
Parents: 3070ff9
Author: Ivan Mushketyk <iv...@gmail.com>
Authored: Fri Dec 16 07:56:46 2016 +0000
Committer: zentol <ch...@apache.org>
Committed: Thu Jan 5 12:28:18 2017 +0100

----------------------------------------------------------------------
 docs/dev/connectors/twitter.md | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1f155026/docs/dev/connectors/twitter.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/twitter.md b/docs/dev/connectors/twitter.md
index 0ccbbff..9b6a019 100644
--- a/docs/dev/connectors/twitter.md
+++ b/docs/dev/connectors/twitter.md
@@ -58,20 +58,20 @@ In contrast to other connectors, the `TwitterSource` depends on no additional se
 <div data-lang="java" markdown="1">
 {% highlight java %}
 Properties props = new Properties();
-p.setProperty(TwitterSource.CONSUMER_KEY, "");
-p.setProperty(TwitterSource.CONSUMER_SECRET, "");
-p.setProperty(TwitterSource.TOKEN, "");
-p.setProperty(TwitterSource.TOKEN_SECRET, "");
+props.setProperty(TwitterSource.CONSUMER_KEY, "");
+props.setProperty(TwitterSource.CONSUMER_SECRET, "");
+props.setProperty(TwitterSource.TOKEN, "");
+props.setProperty(TwitterSource.TOKEN_SECRET, "");
 DataStream<String> streamSource = env.addSource(new TwitterSource(props));
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 val props = new Properties();
-p.setProperty(TwitterSource.CONSUMER_KEY, "");
-p.setProperty(TwitterSource.CONSUMER_SECRET, "");
-p.setProperty(TwitterSource.TOKEN, "");
-p.setProperty(TwitterSource.TOKEN_SECRET, "");
+props.setProperty(TwitterSource.CONSUMER_KEY, "");
+props.setProperty(TwitterSource.CONSUMER_SECRET, "");
+props.setProperty(TwitterSource.TOKEN, "");
+props.setProperty(TwitterSource.TOKEN_SECRET, "");
 DataStream<String> streamSource = env.addSource(new TwitterSource(props));
 {% endhighlight %}
 </div>


[5/5] flink git commit: [FLINK-5323] [docs] Replace CheckpointNotifier with CheckpointListener

Posted by ch...@apache.org.
[FLINK-5323] [docs] Replace CheckpointNotifier with CheckpointListener

THis closes #3006.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4415dfbd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4415dfbd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4415dfbd

Branch: refs/heads/master
Commit: 4415dfbd5753196c09940b59ad39de9f4e9402b9
Parents: 1f15502
Author: Abhishek R. Singh <ab...@tetrationanalytics.com>
Authored: Wed Dec 14 06:05:11 2016 -0800
Committer: zentol <ch...@apache.org>
Committed: Thu Jan 5 12:28:18 2017 +0100

----------------------------------------------------------------------
 docs/dev/state.md | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4415dfbd/docs/dev/state.md
----------------------------------------------------------------------
diff --git a/docs/dev/state.md b/docs/dev/state.md
index a772a03..99f9055 100644
--- a/docs/dev/state.md
+++ b/docs/dev/state.md
@@ -241,7 +241,7 @@ Instance fields can be checkpointed by using the `Checkpointed` interface.
 When the user-defined function implements the `Checkpointed` interface, the `snapshotState(\u2026)` and `restoreState(\u2026)`
 methods will be executed to draw and restore function state.
 
-In addition to that, user functions can also implement the `CheckpointNotifier` interface to receive notifications on
+In addition to that, user functions can also implement the `CheckpointListener` interface to receive notifications on
 completed checkpoints via the `notifyCheckpointComplete(long checkpointId)` method.
 Note that there is no guarantee for the user function to receive a notification if a failure happens between
 checkpoint completion and notification. The notifications should hence be treated in a way that notifications from
@@ -346,7 +346,7 @@ public static class CounterSource
 }
 {% endhighlight %}
 
-Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `flink.streaming.api.checkpoint.CheckpointNotifier` interface.
+Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface.
 
 ## State Checkpoints in Iterative Jobs