You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/18 13:49:04 UTC
[6/7] flink git commit: [FLINK-2702] [examples] Add JAR packaging for
DistCp example, plus minor cleanups.
[FLINK-2702] [examples] Add JAR packaging for DistCp example, plus minor cleanups.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a74fa8ce
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a74fa8ce
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a74fa8ce
Branch: refs/heads/master
Commit: a74fa8ce93472c6d022852e8a4c1827d71f1bee6
Parents: b9148b6
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Sep 18 12:01:01 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 18 12:01:01 2015 +0200
----------------------------------------------------------------------
flink-examples/flink-java-examples/pom.xml | 73 ++++++--------------
.../flink/examples/java/distcp/DistCp.java | 10 ++-
.../examples/java/distcp/FileCopyTask.java | 7 +-
.../java/distcp/FileCopyTaskInputFormat.java | 8 ++-
.../java/distcp/FileCopyTaskInputSplit.java | 12 ++--
5 files changed, 50 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a74fa8ce/flink-examples/flink-java-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/pom.xml b/flink-examples/flink-java-examples/pom.xml
index 0ed2df6..d90d034 100644
--- a/flink-examples/flink-java-examples/pom.xml
+++ b/flink-examples/flink-java-examples/pom.xml
@@ -197,56 +197,7 @@ under the License.
</includes>
</configuration>
</execution>
-
- <!-- These queries are currently not self-contained -->
-
- <!-- TPC-H Query 10 -->
- <!--
- <execution>
- <id>TPCHQuery10</id>
- <phase>package</phase>
- <goals>
- <goal>jar</goal>
- </goals>
- <configuration>
- <classifier>TPCHQuery10</classifier>
-
- <archive>
- <manifestEntries>
- <program-class>org.apache.flink.examples.java.relational.TPCHQuery10</program-class>
- </manifestEntries>
- </archive>
- <includes>
- <include>**/java/relational/TPCHQuery10.class</include>
- <include>**/java/relational/TPCHQuery10$*.class</include>
- </includes>
- </configuration>
- </execution> -->
-
- <!-- TPC-H Query 3 -->
- <!--
- <execution>
- <id>TPCHQuery3</id>
- <phase>package</phase>
- <goals>
- <goal>jar</goal>
- </goals>
- <configuration>
- <classifier>TPCHQuery3</classifier>
-
- <archive>
- <manifestEntries>
- <program-class>org.apache.flink.examples.java.relational.TPCHQuery3</program-class>
- </manifestEntries>
- </archive>
- <includes>
- <include>**/java/relational/TPCHQuery3.class</include>
- <include>**/java/relational/TPCHQuery3$*.class</include>
- </includes>
- </configuration>
- </execution> -->
-
<!-- WebLogAnalysis -->
<execution>
<id>WebLogAnalysis</id>
@@ -319,7 +270,28 @@ under the License.
</includes>
</configuration>
</execution>
-
+
+ <!-- Distributed Copy -->
+ <execution>
+ <id>DistCp</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <classifier>DistCp</classifier>
+
+ <archive>
+ <manifestEntries>
+ <program-class>org.apache.flink.examples.java.distcp.DistCp</program-class>
+ </manifestEntries>
+ </archive>
+
+ <includes>
+ <include>**/java/distcp/*</include>
+ </includes>
+ </configuration>
+ </execution>
</executions>
</plugin>
@@ -346,6 +318,7 @@ under the License.
<copy file="${project.basedir}/target/flink-java-examples-${project.version}-WebLogAnalysis.jar" tofile="${project.basedir}/target/WebLogAnalysis.jar" />
<copy file="${project.basedir}/target/flink-java-examples-${project.version}-WordCount.jar" tofile="${project.basedir}/target/WordCount.jar" />
<copy file="${project.basedir}/target/flink-java-examples-${project.version}-WordCountPOJO.jar" tofile="${project.basedir}/target/WordCountPOJO.jar" />
+ <copy file="${project.basedir}/target/flink-java-examples-${project.version}-DistCp.jar" tofile="${project.basedir}/target/DistCp.jar" />
</target>
</configuration>
</execution>
http://git-wip-us.apache.org/repos/asf/flink/blob/a74fa8ce/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
index 3eae211..08f90a6 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
@@ -19,6 +19,7 @@
package org.apache.flink.examples.java.distcp;
import org.apache.commons.io.IOUtils;
+
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.DataSet;
@@ -34,6 +35,7 @@ import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.Collector;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,6 +56,7 @@ import java.util.Map;
* However, in a distributed environment HDFS paths must be provided both as input and output.
*/
public class DistCp {
+
private static final Logger LOGGER = LoggerFactory.getLogger(DistCp.class);
public static final String BYTES_COPIED_CNT_NAME = "BYTES_COPIED";
public static final String FILES_COPIED_CNT_NAME = "FILES_COPIED";
@@ -61,7 +64,7 @@ public class DistCp {
public static void main(String[] args) throws Exception {
if (args.length != 3) {
printHelp();
- System.exit(1);
+ return;
}
final Path sourcePath = new Path(args[0]);
@@ -84,8 +87,9 @@ public class DistCp {
FlatMapOperator<FileCopyTask, Object> res = inputTasks.flatMap(new RichFlatMapFunction<FileCopyTask, Object>() {
- public LongCounter fileCounter;
- public LongCounter bytesCounter;
+
+ private LongCounter fileCounter;
+ private LongCounter bytesCounter;
@Override
public void open(Configuration parameters) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/a74fa8ce/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
index 3778775..7f38a8b 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
@@ -27,8 +27,11 @@ import java.io.Serializable;
* A Java POJO that represents a task for copying a single file
*/
public class FileCopyTask implements Serializable {
- private Path path;
- private String relativePath;
+
+ private static final long serialVersionUID = -8760082278978316032L;
+
+ private final Path path;
+ private final String relativePath;
public FileCopyTask(Path path, String relativePath) {
if (StringUtils.isEmpty(relativePath)) {
http://git-wip-us.apache.org/repos/asf/flink/blob/a74fa8ce/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
index 3ac872a..6137e12 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
@@ -38,7 +38,12 @@ import java.util.Queue;
* that have finished previously assigned tasks
*/
public class FileCopyTaskInputFormat implements InputFormat<FileCopyTask, FileCopyTaskInputSplit> {
+
+ private static final long serialVersionUID = -644394866425221151L;
+
private static final Logger LOGGER = LoggerFactory.getLogger(FileCopyTaskInputFormat.class);
+
+
private final List<FileCopyTask> tasks;
public FileCopyTaskInputFormat(List<FileCopyTask> tasks) {
@@ -74,7 +79,8 @@ public class FileCopyTaskInputFormat implements InputFormat<FileCopyTask, FileCo
FileCopyTaskInputSplit[] splits = new FileCopyTaskInputSplit[tasks.size()];
int i = 0;
for (FileCopyTask t : tasks) {
- splits[i++] = new FileCopyTaskInputSplit(t);
+ splits[i] = new FileCopyTaskInputSplit(t, i);
+ i++;
}
return splits;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a74fa8ce/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
index 8ee5e09..33943b6 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
@@ -24,11 +24,15 @@ import org.apache.flink.core.io.InputSplit;
* Implementation of {@code InputSplit} for copying files
*/
public class FileCopyTaskInputSplit implements InputSplit {
- private int splitNo = 0;
- private FileCopyTask task;
+
+ private static final long serialVersionUID = -7621656017747660450L;
+
+ private final FileCopyTask task;
+ private final int splitNumber;
- public FileCopyTaskInputSplit(FileCopyTask task) {
+ public FileCopyTaskInputSplit(FileCopyTask task, int splitNumber) {
this.task = task;
+ this.splitNumber = splitNumber;
}
public FileCopyTask getTask() {
@@ -37,6 +41,6 @@ public class FileCopyTaskInputSplit implements InputSplit {
@Override
public int getSplitNumber() {
- return splitNo++;
+ return splitNumber;
}
}