You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/18 13:49:04 UTC

[6/7] flink git commit: [FLINK-2702] [examples] Add JAR packaging for DistCp example, plus minor cleanups.

[FLINK-2702] [examples] Add JAR packaging for DistCp example, plus minor cleanups.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a74fa8ce
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a74fa8ce
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a74fa8ce

Branch: refs/heads/master
Commit: a74fa8ce93472c6d022852e8a4c1827d71f1bee6
Parents: b9148b6
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Sep 18 12:01:01 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 18 12:01:01 2015 +0200

----------------------------------------------------------------------
 flink-examples/flink-java-examples/pom.xml      | 73 ++++++--------------
 .../flink/examples/java/distcp/DistCp.java      | 10 ++-
 .../examples/java/distcp/FileCopyTask.java      |  7 +-
 .../java/distcp/FileCopyTaskInputFormat.java    |  8 ++-
 .../java/distcp/FileCopyTaskInputSplit.java     | 12 ++--
 5 files changed, 50 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a74fa8ce/flink-examples/flink-java-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/pom.xml b/flink-examples/flink-java-examples/pom.xml
index 0ed2df6..d90d034 100644
--- a/flink-examples/flink-java-examples/pom.xml
+++ b/flink-examples/flink-java-examples/pom.xml
@@ -197,56 +197,7 @@ under the License.
 							</includes>
 						</configuration>
 					</execution>
-					
-					<!-- These queries are currently not self-contained -->
-
-					<!-- TPC-H Query 10 -->
 
-					<!--
-					<execution>
-						<id>TPCHQuery10</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<classifier>TPCHQuery10</classifier>
-
-							<archive>
-								<manifestEntries>
-									<program-class>org.apache.flink.examples.java.relational.TPCHQuery10</program-class>
-								</manifestEntries>
-							</archive>
-							<includes>
-								<include>**/java/relational/TPCHQuery10.class</include>
-								<include>**/java/relational/TPCHQuery10$*.class</include>
-							</includes>
-						</configuration>
-					</execution> -->
-				
-					<!-- TPC-H Query 3 -->
-					<!--
-					<execution>
-						<id>TPCHQuery3</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<classifier>TPCHQuery3</classifier>
-
-							<archive>
-								<manifestEntries>
-									<program-class>org.apache.flink.examples.java.relational.TPCHQuery3</program-class>
-								</manifestEntries>
-							</archive>
-							<includes>
-								<include>**/java/relational/TPCHQuery3.class</include>
-								<include>**/java/relational/TPCHQuery3$*.class</include>
-							</includes>
-						</configuration>
-					</execution> -->
-		
 					<!-- WebLogAnalysis -->
 					<execution>
 						<id>WebLogAnalysis</id>
@@ -319,7 +270,28 @@ under the License.
 							</includes>
 						</configuration>
 					</execution>
-					
+
+					<!-- Distributed Copy -->
+					<execution>
+						<id>DistCp</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>DistCp</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.examples.java.distcp.DistCp</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>**/java/distcp/*</include>
+							</includes>
+						</configuration>
+					</execution>
 				</executions>
 			</plugin>
 
@@ -346,6 +318,7 @@ under the License.
 								<copy file="${project.basedir}/target/flink-java-examples-${project.version}-WebLogAnalysis.jar" tofile="${project.basedir}/target/WebLogAnalysis.jar" />
 								<copy file="${project.basedir}/target/flink-java-examples-${project.version}-WordCount.jar" tofile="${project.basedir}/target/WordCount.jar" />
 								<copy file="${project.basedir}/target/flink-java-examples-${project.version}-WordCountPOJO.jar" tofile="${project.basedir}/target/WordCountPOJO.jar" />
+								<copy file="${project.basedir}/target/flink-java-examples-${project.version}-DistCp.jar" tofile="${project.basedir}/target/DistCp.jar" />
 							</target>
 						</configuration>
 					</execution>

http://git-wip-us.apache.org/repos/asf/flink/blob/a74fa8ce/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
index 3eae211..08f90a6 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
@@ -19,6 +19,7 @@
 package org.apache.flink.examples.java.distcp;
 
 import org.apache.commons.io.IOUtils;
+
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.DataSet;
@@ -34,6 +35,7 @@ import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.Collector;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,6 +56,7 @@ import java.util.Map;
  * However, in a distributed environment HDFS paths must be provided both as input and output.
  */
 public class DistCp {
+	
 	private static final Logger LOGGER = LoggerFactory.getLogger(DistCp.class);
 	public static final String BYTES_COPIED_CNT_NAME = "BYTES_COPIED";
 	public static final String FILES_COPIED_CNT_NAME = "FILES_COPIED";
@@ -61,7 +64,7 @@ public class DistCp {
 	public static void main(String[] args) throws Exception {
 		if (args.length != 3) {
 			printHelp();
-			System.exit(1);
+			return;
 		}
 
 		final Path sourcePath = new Path(args[0]);
@@ -84,8 +87,9 @@ public class DistCp {
 
 
 		FlatMapOperator<FileCopyTask, Object> res = inputTasks.flatMap(new RichFlatMapFunction<FileCopyTask, Object>() {
-			public LongCounter fileCounter;
-			public LongCounter bytesCounter;
+			
+			private LongCounter fileCounter;
+			private LongCounter bytesCounter;
 
 			@Override
 			public void open(Configuration parameters) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/a74fa8ce/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
index 3778775..7f38a8b 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
@@ -27,8 +27,11 @@ import java.io.Serializable;
  * A Java POJO that represents a task for copying a single file
  */
 public class FileCopyTask implements Serializable {
-	private Path path;
-	private String relativePath;
+	
+	private static final long serialVersionUID = -8760082278978316032L;
+	
+	private final Path path;
+	private final String relativePath;
 
 	public FileCopyTask(Path path, String relativePath) {
 		if (StringUtils.isEmpty(relativePath)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a74fa8ce/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
index 3ac872a..6137e12 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
@@ -38,7 +38,12 @@ import java.util.Queue;
  * that have finished previously assigned tasks
  */
 public class FileCopyTaskInputFormat implements InputFormat<FileCopyTask, FileCopyTaskInputSplit> {
+
+	private static final long serialVersionUID = -644394866425221151L;
+	
 	private static final Logger LOGGER = LoggerFactory.getLogger(FileCopyTaskInputFormat.class);
+	
+
 	private final List<FileCopyTask> tasks;
 
 	public FileCopyTaskInputFormat(List<FileCopyTask> tasks) {
@@ -74,7 +79,8 @@ public class FileCopyTaskInputFormat implements InputFormat<FileCopyTask, FileCo
 		FileCopyTaskInputSplit[] splits = new FileCopyTaskInputSplit[tasks.size()];
 		int i = 0;
 		for (FileCopyTask t : tasks) {
-			splits[i++] = new FileCopyTaskInputSplit(t);
+			splits[i] = new FileCopyTaskInputSplit(t, i);
+			i++;
 		}
 		return splits;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a74fa8ce/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
index 8ee5e09..33943b6 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
@@ -24,11 +24,15 @@ import org.apache.flink.core.io.InputSplit;
  * Implementation of {@code InputSplit} for copying files
  */
 public class FileCopyTaskInputSplit implements InputSplit {
-	private int splitNo = 0;
-	private FileCopyTask task;
+	
+	private static final long serialVersionUID = -7621656017747660450L;
+	
+	private final FileCopyTask task;
+	private final int splitNumber;
 
-	public FileCopyTaskInputSplit(FileCopyTask task) {
+	public FileCopyTaskInputSplit(FileCopyTask task, int splitNumber) {
 		this.task = task;
+		this.splitNumber = splitNumber;
 	}
 
 	public FileCopyTask getTask() {
@@ -37,6 +41,6 @@ public class FileCopyTaskInputSplit implements InputSplit {
 
 	@Override
 	public int getSplitNumber() {
-		return splitNo++;
+		return splitNumber;
 	}
 }