You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/03/04 19:11:07 UTC
[12/50] [abbrv] incubator-beam git commit: [cleanup] remove obsolete
code
[cleanup] remove obsolete code
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f0cb5f07
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f0cb5f07
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f0cb5f07
Branch: refs/heads/master
Commit: f0cb5f07361f6e6eca30fa66a1d80d205ee7d2b8
Parents: 602d8fe
Author: Max <ma...@posteo.de>
Authored: Wed Feb 17 13:23:26 2016 +0100
Committer: Davor Bonaci <da...@users.noreply.github.com>
Committed: Fri Mar 4 10:04:23 2016 -0800
----------------------------------------------------------------------
.../translation/FlinkBatchTransformTranslators.java | 3 ++-
.../translation/wrappers/SourceInputFormat.java | 3 +--
.../wrappers/streaming/FlinkAbstractParDoWrapper.java | 14 --------------
.../flink/dataflow/JoinExamplesITCase.java | 7 -------
4 files changed, 3 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f0cb5f07/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java
index 9a43d05..d5c09b2 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java
@@ -151,7 +151,8 @@ public class FlinkBatchTransformTranslators {
TypeInformation<T> typeInformation = context.getTypeInfo(output);
- DataSource<T> dataSource = new DataSource<>(context.getExecutionEnvironment(), new SourceInputFormat<>(source, context.getPipelineOptions(), coder), typeInformation, name);
+ DataSource<T> dataSource = new DataSource<>(context.getExecutionEnvironment(),
+ new SourceInputFormat<>(source, context.getPipelineOptions()), typeInformation, name);
context.setOutputDataSet(output, dataSource);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f0cb5f07/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java
index b3eca96..64dc072 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java
@@ -49,10 +49,9 @@ public class SourceInputFormat<T> implements InputFormat<T, SourceInputSplit<T>>
private BoundedSource.BoundedReader<T> reader = null;
private boolean reachedEnd = true;
- public SourceInputFormat(BoundedSource<T> initialSource, PipelineOptions options, Coder<T> coder) {
+ public SourceInputFormat(BoundedSource<T> initialSource, PipelineOptions options) {
this.initialSource = initialSource;
this.options = options;
- Coder<T> coder1 = coder;
}
private void writeObject(ObjectOutputStream out)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f0cb5f07/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
index 53bb177..71f9c7f 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
@@ -55,20 +55,6 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFl
this.windowingStrategy = windowingStrategy;
}
-// protected void writeObject(ObjectOutputStream out)
-// throws IOException, ClassNotFoundException {
-// out.defaultWriteObject();
-// ObjectMapper mapper = new ObjectMapper();
-// mapper.writeValue(out, options);
-// }
-//
-// protected void readObject(ObjectInputStream in)
-// throws IOException, ClassNotFoundException {
-// in.defaultReadObject();
-// ObjectMapper mapper = new ObjectMapper();
-// options = mapper.readValue(in, PipelineOptions.class);
-// }
-
private void initContext(DoFn<IN, OUTDF> function, Collector<WindowedValue<OUTFL>> outCollector) {
if (this.context == null) {
this.context = new DoFnProcessContext(function, outCollector);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f0cb5f07/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java
index dfcadc1..ed2ecf5 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java
@@ -53,13 +53,6 @@ public class JoinExamplesITCase extends JavaProgramTestBase {
};
static final List<TableRow> EVENT_ARRAY = Arrays.asList(EVENTS);
- private static final KV<String, String> kv1 = KV.of("VM",
- "Date: 20141212, Actor1: LAOS, url: http://www.chicagotribune.com");
- private static final KV<String, String> kv2 = KV.of("BE",
- "Date: 20141213, Actor1: AFGHANISTAN, url: http://cnn.com");
- private static final KV<String, String> kv3 = KV.of("BE", "Belgium");
- private static final KV<String, String> kv4 = KV.of("VM", "Vietnam");
-
private static final TableRow cc1 = new TableRow()
.set("FIPSCC", "VM").set("HumanName", "Vietnam");
private static final TableRow cc2 = new TableRow()