You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/03/04 19:11:07 UTC

[12/50] [abbrv] incubator-beam git commit: [cleanup] remove obsolete code

[cleanup] remove obsolete code


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f0cb5f07
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f0cb5f07
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f0cb5f07

Branch: refs/heads/master
Commit: f0cb5f07361f6e6eca30fa66a1d80d205ee7d2b8
Parents: 602d8fe
Author: Max <ma...@posteo.de>
Authored: Wed Feb 17 13:23:26 2016 +0100
Committer: Davor Bonaci <da...@users.noreply.github.com>
Committed: Fri Mar 4 10:04:23 2016 -0800

----------------------------------------------------------------------
 .../translation/FlinkBatchTransformTranslators.java   |  3 ++-
 .../translation/wrappers/SourceInputFormat.java       |  3 +--
 .../wrappers/streaming/FlinkAbstractParDoWrapper.java | 14 --------------
 .../flink/dataflow/JoinExamplesITCase.java            |  7 -------
 4 files changed, 3 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f0cb5f07/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java
index 9a43d05..d5c09b2 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java
@@ -151,7 +151,8 @@ public class FlinkBatchTransformTranslators {
 
 			TypeInformation<T> typeInformation = context.getTypeInfo(output);
 
-			DataSource<T> dataSource = new DataSource<>(context.getExecutionEnvironment(), new SourceInputFormat<>(source, context.getPipelineOptions(), coder), typeInformation, name);
+			DataSource<T> dataSource = new DataSource<>(context.getExecutionEnvironment(),
+					new SourceInputFormat<>(source, context.getPipelineOptions()), typeInformation, name);
 
 			context.setOutputDataSet(output, dataSource);
 		}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f0cb5f07/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java
index b3eca96..64dc072 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java
@@ -49,10 +49,9 @@ public class SourceInputFormat<T> implements InputFormat<T, SourceInputSplit<T>>
 	private BoundedSource.BoundedReader<T> reader = null;
 	private boolean reachedEnd = true;
 
-	public SourceInputFormat(BoundedSource<T> initialSource, PipelineOptions options, Coder<T> coder) {
+	public SourceInputFormat(BoundedSource<T> initialSource, PipelineOptions options) {
 		this.initialSource = initialSource;
 		this.options = options;
-		Coder<T> coder1 = coder;
 	}
 
 	private void writeObject(ObjectOutputStream out)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f0cb5f07/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
index 53bb177..71f9c7f 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
@@ -55,20 +55,6 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFl
 		this.windowingStrategy = windowingStrategy;
 	}
 
-//	protected void writeObject(ObjectOutputStream out)
-//			throws IOException, ClassNotFoundException {
-//		out.defaultWriteObject();
-//		ObjectMapper mapper = new ObjectMapper();
-//		mapper.writeValue(out, options);
-//	}
-//
-//	protected void readObject(ObjectInputStream in)
-//			throws IOException, ClassNotFoundException {
-//		in.defaultReadObject();
-//		ObjectMapper mapper = new ObjectMapper();
-//		options = mapper.readValue(in, PipelineOptions.class);
-//	}
-
 	private void initContext(DoFn<IN, OUTDF> function, Collector<WindowedValue<OUTFL>> outCollector) {
 		if (this.context == null) {
 			this.context = new DoFnProcessContext(function, outCollector);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f0cb5f07/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java
index dfcadc1..ed2ecf5 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java
@@ -53,13 +53,6 @@ public class JoinExamplesITCase extends JavaProgramTestBase {
 	};
 	static final List<TableRow> EVENT_ARRAY = Arrays.asList(EVENTS);
 
-	private static final KV<String, String> kv1 = KV.of("VM",
-			"Date: 20141212, Actor1: LAOS, url: http://www.chicagotribune.com");
-	private static final KV<String, String> kv2 = KV.of("BE",
-			"Date: 20141213, Actor1: AFGHANISTAN, url: http://cnn.com");
-	private static final KV<String, String> kv3 = KV.of("BE", "Belgium");
-	private static final KV<String, String> kv4 = KV.of("VM", "Vietnam");
-
 	private static final TableRow cc1 = new TableRow()
 			.set("FIPSCC", "VM").set("HumanName", "Vietnam");
 	private static final TableRow cc2 = new TableRow()