You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/07/07 15:00:58 UTC
[1/2] flink git commit: [FLINK-6842] [runtime] Uncomment and activate
code in HadoopFileSystem
Repository: flink
Updated Branches:
refs/heads/master d1a0935e2 -> 709f23e74
[FLINK-6842] [runtime] Uncomment and activate code in HadoopFileSystem
This closes #4219
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a76421ec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a76421ec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a76421ec
Branch: refs/heads/master
Commit: a76421ecadd3d1d3e0c13192a677ee4cf8e8d432
Parents: d1a0935
Author: zhangminglei <zm...@163.com>
Authored: Thu Jul 6 23:01:54 2017 +0800
Committer: Greg Hogan <co...@greghogan.com>
Committed: Fri Jul 7 10:08:39 2017 -0400
----------------------------------------------------------------------
.../flink/runtime/fs/hdfs/HadoopFileSystem.java | 15 ++++++---------
1 file changed, 6 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a76421ec/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index f47423f..7ab7ab7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -459,15 +459,12 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
@Override
public Class<?> getHadoopWrapperClassNameForFileSystem(String scheme) {
Configuration hadoopConf = getHadoopConfiguration();
- Class<? extends org.apache.hadoop.fs.FileSystem> clazz;
- // We can activate this block once we drop Hadoop1 support (only hd2 has the getFileSystemClass-method)
-// try {
-// clazz = org.apache.hadoop.fs.FileSystem.getFileSystemClass(scheme, hadoopConf);
-// } catch (IOException e) {
-// LOG.info("Flink could not load the Hadoop File system implementation for scheme "+scheme);
-// return null;
-// }
- clazz = hadoopConf.getClass("fs." + scheme + ".impl", null, org.apache.hadoop.fs.FileSystem.class);
+ Class<? extends org.apache.hadoop.fs.FileSystem> clazz = null;
+ try {
+ clazz = org.apache.hadoop.fs.FileSystem.getFileSystemClass(scheme, hadoopConf);
+ } catch (IOException e) {
+ LOG.info("Flink could not load the Hadoop File system implementation for scheme " + scheme);
+ }
if (clazz != null && LOG.isDebugEnabled()) {
LOG.debug("Flink supports {} with the Hadoop file system wrapper, impl {}", scheme, clazz);
[2/2] flink git commit: [FLINK-7042] [yarn] Fix jar file discovery
flink-yarn-tests
Posted by gr...@apache.org.
[FLINK-7042] [yarn] Fix jar file discovery flink-yarn-tests
Add dependencies for batch and streaming WordCount programs and copies
the jar files into a new target/programs directory. The integration
tests now directly references the program jar files rather than the
prior brittle search.
This removes the flink-yarn-tests build-time dependency on the examples
modules (there remains a build-time dependency on flink-dist).
This closes #4264
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/709f23e7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/709f23e7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/709f23e7
Branch: refs/heads/master
Commit: 709f23e742b094a5337c9333a17de7dbdc924891
Parents: a76421e
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Jul 5 12:00:30 2017 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Fri Jul 7 10:08:52 2017 -0400
----------------------------------------------------------------------
flink-yarn-tests/pom.xml | 60 ++++++++++++++++++++
.../YARNSessionCapacitySchedulerITCase.java | 13 ++---
2 files changed, 65 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/709f23e7/flink-yarn-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml
index f198ece..349283a 100644
--- a/flink-yarn-tests/pom.xml
+++ b/flink-yarn-tests/pom.xml
@@ -81,6 +81,24 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-examples-batch_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>jar</type>
+ <classifier>WordCount</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-examples-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>jar</type>
+ <classifier>WordCount</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop2</artifactId>
<version>${project.version}</version>
<scope>test</scope>
@@ -351,6 +369,48 @@ under the License.
</execution>
</executions>
</plugin>
+
+ <!--
+ Copy batch and streaming examples programs in to the flink-yarn-tests/target/programs
+ directory to be run during YARN integration tests.
+ -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>3.0.1</version>
+ <executions>
+ <execution>
+ <id>copy</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>copy</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <artifactItems>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-examples-batch_${scala.binary.version}</artifactId>
+ <type>jar</type>
+ <classifier>WordCount</classifier>
+ <overWrite>true</overWrite>
+ <destFileName>BatchWordCount.jar</destFileName>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-examples-streaming_${scala.binary.version}</artifactId>
+ <type>jar</type>
+ <classifier>WordCount</classifier>
+ <overWrite>true</overWrite>
+ <destFileName>StreamingWordCount.jar</destFileName>
+ </artifactItem>
+ </artifactItems>
+ <outputDirectory>${project.build.directory}/programs</outputDirectory>
+ <overWriteReleases>false</overWriteReleases>
+ <overWriteSnapshots>true</overWriteSnapshots>
+ </configuration>
+ </plugin>
</plugins>
</build>
</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/709f23e7/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index b290756..7d6c5d6 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -115,7 +115,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
public void perJobYarnCluster() {
LOG.info("Starting perJobYarnCluster()");
addTestAppender(JobClient.class, Level.INFO);
- File exampleJarLocation = YarnTestBase.findFile("..", new ContainsName(new String[] {"-WordCount.jar"} , "streaming")); // exclude streaming wordcount here.
+ File exampleJarLocation = new File("target/programs/BatchWordCount.jar");
Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation);
runWithArgs(new String[]{"run", "-m", "yarn-cluster",
"-yj", flinkUberjar.getAbsolutePath(), "-yt", flinkLibFolder.getAbsolutePath(),
@@ -339,7 +339,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
// write log messages to stdout as well, so that the runWithArgs() method
// is catching the log output
addTestAppender(JobClient.class, Level.INFO);
- File exampleJarLocation = YarnTestBase.findFile("..", new ContainsName(new String[] {"-WordCount.jar"}, "streaming")); // exclude streaming wordcount here.
+ File exampleJarLocation = new File("target/programs/BatchWordCount.jar");
Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation);
runWithArgs(new String[]{"run",
"-p", "2", //test that the job is executed with a DOP of 2
@@ -364,9 +364,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
public void testDetachedPerJobYarnCluster() {
LOG.info("Starting testDetachedPerJobYarnCluster()");
- File exampleJarLocation = YarnTestBase.findFile(
- ".." + File.separator + "flink-examples" + File.separator + "flink-examples-batch",
- new ContainsName(new String[] {"-WordCount.jar"}));
+ File exampleJarLocation = new File("target/programs/BatchWordCount.jar");
Assert.assertNotNull("Could not find batch wordcount jar", exampleJarLocation);
@@ -382,9 +380,8 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
public void testDetachedPerJobYarnClusterWithStreamingJob() {
LOG.info("Starting testDetachedPerJobYarnClusterWithStreamingJob()");
- File exampleJarLocation = YarnTestBase.findFile(
- ".." + File.separator + "flink-examples" + File.separator + "flink-examples-streaming",
- new ContainsName(new String[] {"-WordCount.jar"}));
+ File exampleJarLocation = new File("target/programs/StreamingWordCount.jar");
+
Assert.assertNotNull("Could not find streaming wordcount jar", exampleJarLocation);
testDetachedPerJobYarnClusterInternal(exampleJarLocation.getAbsolutePath());