You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/07/07 15:00:58 UTC

[1/2] flink git commit: [FLINK-6842] [runtime] Uncomment and activate code in HadoopFileSystem

Repository: flink
Updated Branches:
  refs/heads/master d1a0935e2 -> 709f23e74


[FLINK-6842] [runtime] Uncomment and activate code in HadoopFileSystem

This closes #4219


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a76421ec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a76421ec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a76421ec

Branch: refs/heads/master
Commit: a76421ecadd3d1d3e0c13192a677ee4cf8e8d432
Parents: d1a0935
Author: zhangminglei <zm...@163.com>
Authored: Thu Jul 6 23:01:54 2017 +0800
Committer: Greg Hogan <co...@greghogan.com>
Committed: Fri Jul 7 10:08:39 2017 -0400

----------------------------------------------------------------------
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java      | 15 ++++++---------
 1 file changed, 6 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a76421ec/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index f47423f..7ab7ab7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -459,15 +459,12 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 	@Override
 	public Class<?> getHadoopWrapperClassNameForFileSystem(String scheme) {
 		Configuration hadoopConf = getHadoopConfiguration();
-		Class<? extends org.apache.hadoop.fs.FileSystem> clazz;
-		// We can activate this block once we drop Hadoop1 support (only hd2 has the getFileSystemClass-method)
-//		try {
-//			clazz = org.apache.hadoop.fs.FileSystem.getFileSystemClass(scheme, hadoopConf);
-//		} catch (IOException e) {
-//			LOG.info("Flink could not load the Hadoop File system implementation for scheme "+scheme);
-//			return null;
-//		}
-		clazz = hadoopConf.getClass("fs." + scheme + ".impl", null, org.apache.hadoop.fs.FileSystem.class);
+		Class<? extends org.apache.hadoop.fs.FileSystem> clazz = null;
+		try {
+			clazz = org.apache.hadoop.fs.FileSystem.getFileSystemClass(scheme, hadoopConf);
+		} catch (IOException e) {
+			LOG.info("Flink could not load the Hadoop File system implementation for scheme " + scheme);
+		}
 
 		if (clazz != null && LOG.isDebugEnabled()) {
 			LOG.debug("Flink supports {} with the Hadoop file system wrapper, impl {}", scheme, clazz);


[2/2] flink git commit: [FLINK-7042] [yarn] Fix jar file discovery flink-yarn-tests

Posted by gr...@apache.org.
[FLINK-7042] [yarn] Fix jar file discovery flink-yarn-tests

Add dependencies for batch and streaming WordCount programs and copies
the jar files into a new target/programs directory. The integration
tests now directly references the program jar files rather than the
prior brittle search.

This removes the flink-yarn-tests build-time dependency on the examples
modules (there remains a build-time dependency on flink-dist).

This closes #4264


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/709f23e7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/709f23e7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/709f23e7

Branch: refs/heads/master
Commit: 709f23e742b094a5337c9333a17de7dbdc924891
Parents: a76421e
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Jul 5 12:00:30 2017 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Fri Jul 7 10:08:52 2017 -0400

----------------------------------------------------------------------
 flink-yarn-tests/pom.xml                        | 60 ++++++++++++++++++++
 .../YARNSessionCapacitySchedulerITCase.java     | 13 ++---
 2 files changed, 65 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/709f23e7/flink-yarn-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml
index f198ece..349283a 100644
--- a/flink-yarn-tests/pom.xml
+++ b/flink-yarn-tests/pom.xml
@@ -81,6 +81,24 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-examples-batch_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<type>jar</type>
+			<classifier>WordCount</classifier>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-examples-streaming_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<type>jar</type>
+			<classifier>WordCount</classifier>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-shaded-hadoop2</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
@@ -351,6 +369,48 @@ under the License.
 					</execution>
 				</executions>
 			</plugin>
+
+			<!--
+			Copy batch and streaming examples programs in to the flink-yarn-tests/target/programs
+			directory to be run during YARN integration tests.
+			-->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-dependency-plugin</artifactId>
+				<version>3.0.1</version>
+				<executions>
+					<execution>
+						<id>copy</id>
+						<phase>process-test-resources</phase>
+						<goals>
+							<goal>copy</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<artifactItems>
+						<artifactItem>
+							<groupId>org.apache.flink</groupId>
+							<artifactId>flink-examples-batch_${scala.binary.version}</artifactId>
+							<type>jar</type>
+							<classifier>WordCount</classifier>
+							<overWrite>true</overWrite>
+							<destFileName>BatchWordCount.jar</destFileName>
+						</artifactItem>
+						<artifactItem>
+							<groupId>org.apache.flink</groupId>
+							<artifactId>flink-examples-streaming_${scala.binary.version}</artifactId>
+							<type>jar</type>
+							<classifier>WordCount</classifier>
+							<overWrite>true</overWrite>
+							<destFileName>StreamingWordCount.jar</destFileName>
+						</artifactItem>
+					</artifactItems>
+					<outputDirectory>${project.build.directory}/programs</outputDirectory>
+					<overWriteReleases>false</overWriteReleases>
+					<overWriteSnapshots>true</overWriteSnapshots>
+				</configuration>
+			</plugin>
 		</plugins>
 	</build>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/709f23e7/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index b290756..7d6c5d6 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -115,7 +115,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 	public void perJobYarnCluster() {
 		LOG.info("Starting perJobYarnCluster()");
 		addTestAppender(JobClient.class, Level.INFO);
-		File exampleJarLocation = YarnTestBase.findFile("..", new ContainsName(new String[] {"-WordCount.jar"} , "streaming")); // exclude streaming wordcount here.
+		File exampleJarLocation = new File("target/programs/BatchWordCount.jar");
 		Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation);
 		runWithArgs(new String[]{"run", "-m", "yarn-cluster",
 				"-yj", flinkUberjar.getAbsolutePath(), "-yt", flinkLibFolder.getAbsolutePath(),
@@ -339,7 +339,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 		// write log messages to stdout as well, so that the runWithArgs() method
 		// is catching the log output
 		addTestAppender(JobClient.class, Level.INFO);
-		File exampleJarLocation = YarnTestBase.findFile("..", new ContainsName(new String[] {"-WordCount.jar"}, "streaming")); // exclude streaming wordcount here.
+		File exampleJarLocation = new File("target/programs/BatchWordCount.jar");
 		Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation);
 		runWithArgs(new String[]{"run",
 				"-p", "2", //test that the job is executed with a DOP of 2
@@ -364,9 +364,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 	public void testDetachedPerJobYarnCluster() {
 		LOG.info("Starting testDetachedPerJobYarnCluster()");
 
-		File exampleJarLocation = YarnTestBase.findFile(
-			".." + File.separator + "flink-examples" + File.separator + "flink-examples-batch",
-			new ContainsName(new String[] {"-WordCount.jar"}));
+		File exampleJarLocation = new File("target/programs/BatchWordCount.jar");
 
 		Assert.assertNotNull("Could not find batch wordcount jar", exampleJarLocation);
 
@@ -382,9 +380,8 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 	public void testDetachedPerJobYarnClusterWithStreamingJob() {
 		LOG.info("Starting testDetachedPerJobYarnClusterWithStreamingJob()");
 
-		File exampleJarLocation = YarnTestBase.findFile(
-			".." + File.separator + "flink-examples" + File.separator + "flink-examples-streaming",
-			new ContainsName(new String[] {"-WordCount.jar"}));
+		File exampleJarLocation = new File("target/programs/StreamingWordCount.jar");
+
 		Assert.assertNotNull("Could not find streaming wordcount jar", exampleJarLocation);
 
 		testDetachedPerJobYarnClusterInternal(exampleJarLocation.getAbsolutePath());