You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/13 00:41:15 UTC
[44/50] [abbrv] incubator-beam git commit: fix import order
fix import order
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/59ae94c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/59ae94c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/59ae94c5
Branch: refs/heads/gearpump-runner
Commit: 59ae94c59931732d5cf78c5431147d580f9ff747
Parents: 6cd48c4
Author: manuzhang <ow...@gmail.com>
Authored: Mon Sep 12 11:45:15 2016 +0800
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Sep 12 17:40:14 2016 -0700
----------------------------------------------------------------------
runners/gearpump/pom.xml | 7 ++---
.../gearpump/GearpumpPipelineOptions.java | 8 +++---
.../gearpump/GearpumpPipelineResult.java | 4 ++-
.../gearpump/GearpumpPipelineRunner.java | 28 ++++++++++----------
.../GearpumpPipelineRunnerRegistrar.java | 9 +++----
.../gearpump/GearpumpPipelineTranslator.java | 7 +++--
.../gearpump/examples/StreamingWordCount.java | 15 +++--------
.../gearpump/examples/UnboundedTextSource.java | 15 ++++++-----
.../translators/GroupByKeyTranslator.java | 14 +++++-----
.../translators/ParDoBoundMultiTranslator.java | 16 +++++------
.../translators/TransformTranslator.java | 3 +--
.../translators/TranslationContext.java | 6 ++---
.../translators/functions/DoFnFunction.java | 14 +++++-----
.../translators/io/BoundedSourceWrapper.java | 4 +--
.../gearpump/translators/io/GearpumpSource.java | 12 +++++----
.../translators/io/UnboundedSourceWrapper.java | 4 +--
.../gearpump/translators/io/ValuesSource.java | 12 ++++-----
.../translators/utils/GearpumpDoFnRunner.java | 28 ++++++++++----------
.../translators/utils/NoOpSideInputReader.java | 8 +++---
.../translators/utils/NoOpStepContext.java | 6 ++---
.../main/java/org/apache/beam/sdk/Pipeline.java | 2 +-
.../apache/beam/sdk/runners/PipelineRunner.java | 1 +
.../beam/sdk/transforms/DoFnAdapters.java | 5 ++++
23 files changed, 114 insertions(+), 114 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml
index cc99a7a..296de6b 100644
--- a/runners/gearpump/pom.xml
+++ b/runners/gearpump/pom.xml
@@ -122,7 +122,6 @@
<groupId>org.apache.gearpump</groupId>
<artifactId>gearpump-daemon_2.11</artifactId>
<version>${gearpump.version}</version>
- <scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.gearpump</groupId>
@@ -186,10 +185,6 @@
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
- <groupId>com.google.http-client</groupId>
- <artifactId>google-http-client</artifactId>
- </dependency>
- <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
@@ -225,6 +220,7 @@
<artifactId>auto-service</artifactId>
<version>1.0-rc2</version>
</dependency>
+
</dependencies>
<build>
@@ -287,6 +283,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
</plugin>
+
</plugins>
</build>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java
index 5b6ee96..e02cbbc 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java
@@ -18,17 +18,17 @@
package org.apache.beam.runners.gearpump;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import java.util.Map;
+
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
import org.apache.gearpump.cluster.client.ClientContext;
import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
-import java.util.Map;
-
/**
* Options that configure the Gearpump pipeline.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
index 6184bc3..2011a4b 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
@@ -17,14 +17,16 @@
*/
package org.apache.beam.runners.gearpump;
+import java.io.IOException;
+
import org.apache.beam.sdk.AggregatorRetrievalException;
import org.apache.beam.sdk.AggregatorValues;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.transforms.Aggregator;
+
import org.joda.time.Duration;
-import java.io.IOException;
/**
* Result of executing a {@link Pipeline} with Gearpump.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
index 4182ee4..ad7bb3e 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
@@ -17,6 +17,13 @@
*/
package org.apache.beam.runners.gearpump;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.beam.runners.core.AssignWindows;
import org.apache.beam.runners.gearpump.translators.TranslationContext;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -30,25 +37,18 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.AssignWindows;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValueFactory;
-
import org.apache.gearpump.cluster.ClusterConfig;
import org.apache.gearpump.cluster.UserConfig;
import org.apache.gearpump.cluster.client.ClientContext;
import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
-import java.util.HashMap;
-import java.util.Map;
-
/**
* A {@link PipelineRunner} that executes the operations in the
* pipeline by first translating them to Gearpump Stream DSL
@@ -79,16 +79,16 @@ public class GearpumpPipelineRunner extends PipelineRunner<GearpumpPipelineResul
PTransform<InputT, OutputT> transform, InputT input) {
if (Window.Bound.class.equals(transform.getClass())) {
return (OutputT) super.apply(
- new AssignWindowsAndSetStrategy((Window.Bound) transform), input);
+ new AssignWindowsAndSetStrategy((Window.Bound) transform), input);
} else if (Flatten.FlattenPCollectionList.class.equals(transform.getClass())
- && ((PCollectionList<?>) input).size() == 0) {
- return (OutputT) Pipeline.applyTransform(input, Create.of());
+ && ((PCollectionList<?>) input).size() == 0) {
+ return (OutputT) Pipeline.applyTransform(input.getPipeline().begin(), Create.of());
} else if (Create.Values.class.equals(transform.getClass())) {
return (OutputT) PCollection
- .<OutputT>createPrimitiveOutputInternal(
- input.getPipeline(),
- WindowingStrategy.globalDefault(),
- PCollection.IsBounded.BOUNDED);
+ .<OutputT>createPrimitiveOutputInternal(
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ PCollection.IsBounded.BOUNDED);
} else {
return super.apply(transform, input);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java
index 2b9e89e..ca173d1 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java
@@ -18,14 +18,14 @@
package org.apache.beam.runners.gearpump;
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableList;
-
/**
* Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the
* {@link GearpumpPipelineRunner}.
@@ -44,8 +44,7 @@ public class GearpumpPipelineRunnerRegistrar {
@Override
public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
- return ImmutableList.<Class<? extends PipelineRunner<?>>>of(
- TestGearpumpRunner.class);
+ return ImmutableList.<Class<? extends PipelineRunner<?>>>of(TestGearpumpRunner.class);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
index 59f0df7..5045ae4 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
@@ -18,6 +18,8 @@
package org.apache.beam.runners.gearpump;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.beam.runners.gearpump.translators.CreateValuesTranslator;
import org.apache.beam.runners.gearpump.translators.FlattenPCollectionTranslator;
import org.apache.beam.runners.gearpump.translators.GroupByKeyTranslator;
@@ -41,9 +43,6 @@ import org.apache.gearpump.util.Graph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
-import java.util.Map;
-
/**
* {@link GearpumpPipelineTranslator} knows how to translate {@link Pipeline} objects
* into Gearpump {@link Graph}.
@@ -109,7 +108,7 @@ public class GearpumpPipelineTranslator implements Pipeline.PipelineVisitor {
@Override
public void visitValue(PValue value, TransformTreeNode producer) {
- LOG.debug("visiting value {}", value);
+ LOG.info("visiting value {}", value);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
index 5f35c6b..ba50de7 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
@@ -23,11 +23,9 @@ import org.apache.beam.runners.gearpump.GearpumpPipelineRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
@@ -45,15 +43,9 @@ import org.slf4j.LoggerFactory;
public class StreamingWordCount {
static class ExtractWordsFn extends OldDoFn<String, String> {
- private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", new Sum.SumLongFn());
@Override
public void processElement(ProcessContext c) {
- if (c.element().trim().isEmpty()) {
- emptyLines.addValue(1L);
- }
-
// Split the line into words.
String[] words = c.element().split("[^a-zA-Z']+");
@@ -81,11 +73,12 @@ public class StreamingWordCount {
public static void main(String[] args) {
- GearpumpPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
- .as(GearpumpPipelineOptions.class);
- options.setApplicationName("StreamingWordCount");
+ GearpumpPipelineOptions options = PipelineOptionsFactory
+ .fromArgs(args).as(GearpumpPipelineOptions.class);
options.setRunner(GearpumpPipelineRunner.class);
+ options.setApplicationName("StreamingWordCount");
options.setParallelism(1);
+
Pipeline p = Pipeline.create(options);
PCollection<KV<String, Long>> wordCounts =
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java
index caf066c..b014432 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java
@@ -18,13 +18,6 @@
package org.apache.beam.runners.gearpump.examples;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-import org.joda.time.Instant;
-
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
@@ -33,6 +26,14 @@ import java.util.NoSuchElementException;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import org.joda.time.Instant;
+
+
/**
* unbounded source that reads from text.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index f36b908..43e3336 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -18,23 +18,25 @@
package org.apache.beam.runners.gearpump.translators;
+import com.google.common.collect.Iterables;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
-import com.google.common.collect.Iterables;
-
import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
import org.apache.gearpump.streaming.javaapi.dsl.functions.GroupByFunction;
import org.apache.gearpump.streaming.javaapi.dsl.functions.MapFunction;
import org.apache.gearpump.streaming.javaapi.dsl.functions.ReduceFunction;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
+
/**
* {@link GroupByKey} is translated to Gearpump groupBy function.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
index d5ed0d2..2b49684 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
@@ -18,6 +18,14 @@
package org.apache.beam.runners.gearpump.translators;
+import com.google.common.collect.Lists;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
import org.apache.beam.runners.gearpump.translators.utils.GearpumpDoFnRunner;
import org.apache.beam.runners.gearpump.translators.utils.NoOpSideInputReader;
@@ -25,8 +33,6 @@ import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.util.DoFnRunner;
-import org.apache.beam.sdk.util.DoFnRunners;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -35,17 +41,11 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
-import com.google.common.collect.Lists;
-
import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
import org.apache.gearpump.streaming.javaapi.dsl.functions.FilterFunction;
import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
import org.apache.gearpump.streaming.javaapi.dsl.functions.MapFunction;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
/**
* {@link ParDo.BoundMulti} is translated to Gearpump flatMap function
* with {@link DoFn} wrapped in {@link DoFnMultiFunction}. The outputs are
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java
index 1ed6d5d..c8587d3 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java
@@ -18,11 +18,10 @@
package org.apache.beam.runners.gearpump.translators;
+import java.io.Serializable;
import org.apache.beam.sdk.transforms.PTransform;
-import java.io.Serializable;
-
/**
* translates {@link PTransform} to Gearpump functions.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
index b9b2c7a..d3bc75d 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
@@ -20,6 +20,9 @@ package org.apache.beam.runners.gearpump.translators;
import static com.google.common.base.Preconditions.checkArgument;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
import org.apache.beam.sdk.runners.TransformTreeNode;
import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -33,9 +36,6 @@ import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
import org.apache.gearpump.streaming.source.DataSource;
-import java.util.HashMap;
-import java.util.Map;
-
/**
* Maintains context data for {@link TransformTranslator}s.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
index b1ebd2a..8d16356 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
@@ -18,26 +18,26 @@
package org.apache.beam.runners.gearpump.translators.functions;
+import com.google.common.collect.Lists;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
import org.apache.beam.runners.gearpump.translators.utils.GearpumpDoFnRunner;
import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.util.DoFnRunner;
-import org.apache.beam.sdk.util.DoFnRunners;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
-import com.google.api.client.util.Lists;
-
import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
-import java.util.Iterator;
-import java.util.List;
-
/**
* Gearpump {@link FlatMapFunction} wrapper over Beam {@link DoFn}.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java
index f25d113..f889101 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java
@@ -18,12 +18,12 @@
package org.apache.beam.runners.gearpump.translators.io;
+import java.io.IOException;
+
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.options.PipelineOptions;
-import java.io.IOException;
-
/**
* wrapper over BoundedSource for Gearpump DataSource API.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
index 892ccc3..8f2beb2 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
@@ -18,23 +18,23 @@
package org.apache.beam.runners.gearpump.translators.io;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
import org.apache.gearpump.Message;
import org.apache.gearpump.streaming.source.DataSource;
import org.apache.gearpump.streaming.task.TaskContext;
import org.joda.time.Instant;
-import java.io.IOException;
-
/**
* common methods for {@link BoundedSourceWrapper} and {@link UnboundedSourceWrapper}.
*/
@@ -61,6 +61,7 @@ public abstract class GearpumpSource<T> implements DataSource {
PipelineOptions options = new ObjectMapper()
.readValue(serializedOptions, PipelineOptions.class);
this.reader = createReader(options);
+ this.available = reader.start();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
@@ -97,4 +98,5 @@ public abstract class GearpumpSource<T> implements DataSource {
throw new RuntimeException(e);
}
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java
index b39f29f..dfdecb2 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java
@@ -18,12 +18,12 @@
package org.apache.beam.runners.gearpump.translators.io;
+import java.io.IOException;
+
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
-import java.io.IOException;
-
/**
* wrapper over UnboundedSource for Gearpump DataSource API.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
index 24055f7..9359e35 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
@@ -18,12 +18,6 @@
package org.apache.beam.runners.gearpump.translators.io;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-import org.joda.time.Instant;
-
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -36,6 +30,12 @@ import java.util.NoSuchElementException;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import org.joda.time.Instant;
+
/**
* unbounded source that reads from a Java {@link Iterable}.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java
index be0d025..e205575 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java
@@ -18,6 +18,20 @@
package org.apache.beam.runners.gearpump.translators.utils;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.SimpleDoFnRunner;
import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
@@ -31,11 +45,8 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.DoFnRunner;
-import org.apache.beam.sdk.util.DoFnRunners;
import org.apache.beam.sdk.util.ExecutionContext;
import org.apache.beam.sdk.util.SideInputReader;
-import org.apache.beam.sdk.util.SimpleDoFnRunner;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.UserCodeException;
@@ -46,19 +57,8 @@ import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
-
import org.joda.time.Instant;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
/**
* a serializable {@link SimpleDoFnRunner}.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpSideInputReader.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpSideInputReader.java
index 600ebfb..d1a9198 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpSideInputReader.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpSideInputReader.java
@@ -18,14 +18,14 @@
package org.apache.beam.runners.gearpump.translators.utils;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.SideInputReader;
-import org.apache.beam.sdk.values.PCollectionView;
-
import java.io.Serializable;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.values.PCollectionView;
+
/**
* no-op side input reader.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
index ce0935a..45f146b 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
@@ -18,6 +18,9 @@
package org.apache.beam.runners.gearpump.translators.utils;
+import java.io.IOException;
+import java.io.Serializable;
+
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.ExecutionContext;
@@ -26,9 +29,6 @@ import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.values.TupleTag;
-import java.io.IOException;
-import java.io.Serializable;
-
/**
* serializable {@link ExecutionContext.StepContext} that basically does nothing.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index 53f46f6..e95304d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -134,7 +134,7 @@ public class Pipeline {
*/
public static Pipeline create(PipelineOptions options) {
Pipeline pipeline = new Pipeline(PipelineRunner.fromOptions(options), options);
- LOG.debug("Creating {}", pipeline);
+ LOG.info("Creating {}", pipeline);
return pipeline;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
index ede1507..1ec4103 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
@@ -57,6 +57,7 @@ public abstract class PipelineRunner<ResultT extends PipelineResult> {
.fromFactoryMethod("fromOptions")
.withArg(PipelineOptions.class, options)
.build();
+ System.out.println("runner: " + result.getClass().getName());
return result;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
index 4803d77..642971f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.transforms;
import java.io.IOException;
+
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
@@ -31,6 +32,8 @@ import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.joda.time.Duration;
import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Utility class containing adapters for running a {@link DoFn} as an {@link OldDoFn}.
@@ -72,6 +75,8 @@ public class DoFnAdapters {
private static class SimpleDoFnAdapter<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
private final DoFn<InputT, OutputT> fn;
private transient DoFnInvoker<InputT, OutputT> invoker;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SimpleDoFnAdapter.class);
SimpleDoFnAdapter(DoFn<InputT, OutputT> fn) {
super(fn.aggregators);