You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/01/06 20:07:59 UTC
[4/5] beam git commit: Reduce visibility of many Dataflow runner
internals
Reduce visibility of many Dataflow runner internals
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/33907f89
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/33907f89
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/33907f89
Branch: refs/heads/master
Commit: 33907f8908238199b166070bc1e12796af32829a
Parents: 5d2cb3e
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jan 5 17:15:52 2017 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jan 6 11:36:51 2017 -0800
----------------------------------------------------------------------
.../beam/runners/dataflow/AssignWindows.java | 89 +++
.../dataflow/DataflowAggregatorTransforms.java | 79 +++
.../dataflow/DataflowMetricUpdateExtractor.java | 109 ++++
.../runners/dataflow/DataflowPipelineJob.java | 2 -
.../dataflow/DataflowPipelineTranslator.java | 3 +-
.../beam/runners/dataflow/DataflowRunner.java | 4 -
.../DataflowUnboundedReadFromBoundedSource.java | 547 +++++++++++++++++++
.../beam/runners/dataflow/ReadTranslator.java | 102 ++++
.../runners/dataflow/TransformTranslator.java | 2 +-
.../dataflow/internal/AssignWindows.java | 89 ---
.../internal/DataflowAggregatorTransforms.java | 79 ---
.../internal/DataflowMetricUpdateExtractor.java | 109 ----
.../DataflowUnboundedReadFromBoundedSource.java | 547 -------------------
.../dataflow/internal/ReadTranslator.java | 102 ----
.../dataflow/DataflowPipelineJobTest.java | 1 -
...aflowUnboundedReadFromBoundedSourceTest.java | 79 +++
...aflowUnboundedReadFromBoundedSourceTest.java | 79 ---
17 files changed, 1007 insertions(+), 1015 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
new file mode 100644
index 0000000..880cd26
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * A primitive {@link PTransform} that implements the {@link Window#into(WindowFn)}
+ * {@link PTransform}.
+ *
+ * <p>For an application of {@link Window#into(WindowFn)} that changes the {@link WindowFn}, applies
+ * a primitive {@link PTransform} in the Dataflow service.
+ *
+ * <p>For an application of {@link Window#into(WindowFn)} that does not change the {@link WindowFn},
+ * applies an identity {@link ParDo} and sets the windowing strategy of the output
+ * {@link PCollection}.
+ *
+ * <p>For internal use only.
+ *
+ * @param <T> the type of input element
+ */
+class AssignWindows<T> extends PTransform<PCollection<T>, PCollection<T>> {
+ private final Window.Bound<T> transform;
+
+ /**
+ * Builds an instance of this class from the overriden transform.
+ */
+ @SuppressWarnings("unused") // Used via reflection
+ public AssignWindows(Window.Bound<T> transform) {
+ this.transform = transform;
+ }
+
+ @Override
+ public PCollection<T> expand(PCollection<T> input) {
+ WindowingStrategy<?, ?> outputStrategy =
+ transform.getOutputStrategyInternal(input.getWindowingStrategy());
+ if (transform.getWindowFn() != null) {
+ // If the windowFn changed, we create a primitive, and run the AssignWindows operation here.
+ return PCollection.<T>createPrimitiveOutputInternal(
+ input.getPipeline(), outputStrategy, input.isBounded());
+ } else {
+ // If the windowFn didn't change, we just run a pass-through transform and then set the
+ // new windowing strategy.
+ return input.apply("Identity", ParDo.of(new DoFn<T, T>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ c.output(c.element());
+ }
+ })).setWindowingStrategyInternal(outputStrategy);
+ }
+ }
+
+ @Override
+ public void validate(PCollection<T> input) {
+ transform.validate(input);
+ }
+
+ @Override
+ protected Coder<?> getDefaultOutputCoder(PCollection<T> input) {
+ return input.getCoder();
+ }
+
+ @Override
+ protected String getKindString() {
+ return "Window.Into()";
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowAggregatorTransforms.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowAggregatorTransforms.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowAggregatorTransforms.java
new file mode 100755
index 0000000..0198cca
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowAggregatorTransforms.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+
+/**
+ * A mapping relating {@link Aggregator}s and the {@link PTransform} in which they are used.
+ */
+class DataflowAggregatorTransforms {
+ private final Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorTransforms;
+ private final Multimap<PTransform<?, ?>, AppliedPTransform<?, ?, ?>> transformAppliedTransforms;
+ private final BiMap<AppliedPTransform<?, ?, ?>, String> appliedStepNames;
+
+ public DataflowAggregatorTransforms(
+ Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorTransforms,
+ Map<AppliedPTransform<?, ?, ?>, String> transformStepNames) {
+ this.aggregatorTransforms = aggregatorTransforms;
+ appliedStepNames = HashBiMap.create(transformStepNames);
+
+ transformAppliedTransforms = HashMultimap.create();
+ for (AppliedPTransform<?, ?, ?> appliedTransform : transformStepNames.keySet()) {
+ transformAppliedTransforms.put(appliedTransform.getTransform(), appliedTransform);
+ }
+ }
+
+ /**
+ * Returns true if the provided {@link Aggregator} is used in the constructing {@link Pipeline}.
+ */
+ public boolean contains(Aggregator<?, ?> aggregator) {
+ return aggregatorTransforms.containsKey(aggregator);
+ }
+
+ /**
+ * Gets the step names in which the {@link Aggregator} is used.
+ */
+ public Collection<String> getAggregatorStepNames(Aggregator<?, ?> aggregator) {
+ Collection<String> names = new HashSet<>();
+ Collection<PTransform<?, ?>> transforms = aggregatorTransforms.get(aggregator);
+ for (PTransform<?, ?> transform : transforms) {
+ for (AppliedPTransform<?, ?, ?> applied : transformAppliedTransforms.get(transform)) {
+ names.add(appliedStepNames.get(applied));
+ }
+ }
+ return names;
+ }
+
+ /**
+ * Gets the {@link PTransform} that was assigned the provided step name.
+ */
+ public AppliedPTransform<?, ?, ?> getAppliedTransformForStepName(String stepName) {
+ return appliedStepNames.inverse().get(stepName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetricUpdateExtractor.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetricUpdateExtractor.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetricUpdateExtractor.java
new file mode 100755
index 0000000..f725c46
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetricUpdateExtractor.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import com.google.api.services.dataflow.model.MetricStructuredName;
+import com.google.api.services.dataflow.model.MetricUpdate;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.PTransform;
+
+/**
+ * Methods for extracting the values of an {@link Aggregator} from a collection of {@link
+ * MetricUpdate MetricUpdates}.
+ */
+final class DataflowMetricUpdateExtractor {
+ private static final String STEP_NAME_CONTEXT_KEY = "step";
+ private static final String IS_TENTATIVE_KEY = "tentative";
+
+ private DataflowMetricUpdateExtractor() {
+ // Do not instantiate.
+ }
+
+ /**
+ * Extract the values of the provided {@link Aggregator} at each {@link PTransform} it was used in
+ * according to the provided {@link DataflowAggregatorTransforms} from the given list of {@link
+ * MetricUpdate MetricUpdates}.
+ */
+ public static <OutputT> Map<String, OutputT> fromMetricUpdates(Aggregator<?, OutputT> aggregator,
+ DataflowAggregatorTransforms aggregatorTransforms, List<MetricUpdate> metricUpdates) {
+ Map<String, OutputT> results = new HashMap<>();
+ if (metricUpdates == null) {
+ return results;
+ }
+
+ String aggregatorName = aggregator.getName();
+ Collection<String> aggregatorSteps = aggregatorTransforms.getAggregatorStepNames(aggregator);
+
+ for (MetricUpdate metricUpdate : metricUpdates) {
+ MetricStructuredName metricStructuredName = metricUpdate.getName();
+ Map<String, String> context = metricStructuredName.getContext();
+ if (metricStructuredName.getName().equals(aggregatorName) && context != null
+ && aggregatorSteps.contains(context.get(STEP_NAME_CONTEXT_KEY))) {
+ AppliedPTransform<?, ?, ?> transform =
+ aggregatorTransforms.getAppliedTransformForStepName(
+ context.get(STEP_NAME_CONTEXT_KEY));
+ String fullName = transform.getFullName();
+ // Prefer the tentative (fresher) value if it exists.
+ if (Boolean.parseBoolean(context.get(IS_TENTATIVE_KEY)) || !results.containsKey(fullName)) {
+ results.put(fullName, toValue(aggregator, metricUpdate));
+ }
+ }
+ }
+
+ return results;
+
+ }
+
+ private static <OutputT> OutputT toValue(
+ Aggregator<?, OutputT> aggregator, MetricUpdate metricUpdate) {
+ CombineFn<?, ?, OutputT> combineFn = aggregator.getCombineFn();
+ Class<? super OutputT> outputType = combineFn.getOutputType().getRawType();
+
+ if (outputType.equals(Long.class)) {
+ @SuppressWarnings("unchecked")
+ OutputT asLong = (OutputT) Long.valueOf(toNumber(metricUpdate).longValue());
+ return asLong;
+ }
+ if (outputType.equals(Integer.class)) {
+ @SuppressWarnings("unchecked")
+ OutputT asInt = (OutputT) Integer.valueOf(toNumber(metricUpdate).intValue());
+ return asInt;
+ }
+ if (outputType.equals(Double.class)) {
+ @SuppressWarnings("unchecked")
+ OutputT asDouble = (OutputT) Double.valueOf(toNumber(metricUpdate).doubleValue());
+ return asDouble;
+ }
+ throw new UnsupportedOperationException(
+ "Unsupported Output Type " + outputType + " in aggregator " + aggregator);
+ }
+
+ private static Number toNumber(MetricUpdate update) {
+ if (update.getScalar() instanceof Number) {
+ return (Number) update.getScalar();
+ }
+ throw new IllegalArgumentException(
+ "Metric Update " + update + " does not have a numeric scalar");
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 00c88f9..0da7137 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -35,8 +35,6 @@ import java.net.SocketTimeoutException;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
-import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms;
-import org.apache.beam.runners.dataflow.internal.DataflowMetricUpdateExtractor;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
import org.apache.beam.sdk.AggregatorRetrievalException;
http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index e9cf6f4..8e5901e 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -58,7 +58,6 @@ import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.DataflowRunner.GroupByKeyAndSortValuesOnly;
import org.apache.beam.runners.dataflow.TransformTranslator.StepTranslationContext;
import org.apache.beam.runners.dataflow.TransformTranslator.TranslationContext;
-import org.apache.beam.runners.dataflow.internal.ReadTranslator;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.DoFnInfo;
import org.apache.beam.runners.dataflow.util.OutputReference;
@@ -106,7 +105,7 @@ import org.slf4j.LoggerFactory;
* into Cloud Dataflow Service API {@link Job}s.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
-public class DataflowPipelineTranslator {
+class DataflowPipelineTranslator {
// Must be kept in sync with their internal counterparts.
private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineTranslator.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 9da7d24..9ff856a 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -72,14 +72,10 @@ import java.util.SortedSet;
import java.util.TreeSet;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
-import org.apache.beam.runners.dataflow.internal.AssignWindows;
-import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms;
-import org.apache.beam.runners.dataflow.internal.DataflowUnboundedReadFromBoundedSource;
import org.apache.beam.runners.dataflow.internal.IsmFormat;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder;
import org.apache.beam.runners.dataflow.internal.IsmFormat.MetadataKeyCoder;
-import org.apache.beam.runners.dataflow.internal.ReadTranslator;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSource.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSource.java
new file mode 100644
index 0000000..cfb5ebc
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSource.java
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.NameUtils;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link PTransform} that converts a {@link BoundedSource} as an {@link UnboundedSource}.
+ *
+ * <p>{@link BoundedSource} is read directly without calling {@link BoundedSource#splitIntoBundles},
+ * and element timestamps are propagated. While any elements remain, the watermark is the beginning
+ * of time {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, and after all elements have been produced
+ * the watermark goes to the end of time {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
+ *
+ * <p>Checkpoints are created by calling {@link BoundedReader#splitAtFraction} on inner
+ * {@link BoundedSource}.
+ * Sources that cannot be split are read entirely into memory, so this transform does not work well
+ * with large, unsplittable sources.
+ *
+ * <p>This transform is intended to be used by a runner during pipeline translation to convert
+ * a Read.Bounded into a Read.Unbounded.
+ *
+ * @deprecated This class is copied from beam runners core in order to avoid pipeline construction
+ * time dependency. It should be replaced in the dataflow worker as an execution time dependency.
+ */
+@Deprecated
+class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PCollection<T>> {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DataflowUnboundedReadFromBoundedSource.class);
+
+ private final BoundedSource<T> source;
+
+ /**
+ * Constructs a {@link PTransform} that performs an unbounded read from a {@link BoundedSource}.
+ */
+ public DataflowUnboundedReadFromBoundedSource(BoundedSource<T> source) {
+ this.source = source;
+ }
+
+ @Override
+ public PCollection<T> expand(PBegin input) {
+ return input.getPipeline().apply(
+ Read.from(new BoundedToUnboundedSourceAdapter<>(source)));
+ }
+
+ @Override
+ protected Coder<T> getDefaultOutputCoder() {
+ return source.getDefaultOutputCoder();
+ }
+
+ @Override
+ public String getKindString() {
+ return String.format("Read(%s)", NameUtils.approximateSimpleName(source, "AnonymousSource"));
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ // We explicitly do not register base-class data, instead we use the delegate inner source.
+ builder
+ .add(DisplayData.item("source", source.getClass()))
+ .include("source", source);
+ }
+
+ /**
+ * A {@code BoundedSource} to {@code UnboundedSource} adapter.
+ */
+ @VisibleForTesting
+ public static class BoundedToUnboundedSourceAdapter<T>
+ extends UnboundedSource<T, BoundedToUnboundedSourceAdapter.Checkpoint<T>> {
+
+ private BoundedSource<T> boundedSource;
+
+ public BoundedToUnboundedSourceAdapter(BoundedSource<T> boundedSource) {
+ this.boundedSource = boundedSource;
+ }
+
+ @Override
+ public void validate() {
+ boundedSource.validate();
+ }
+
+ @Override
+ public List<BoundedToUnboundedSourceAdapter<T>> generateInitialSplits(
+ int desiredNumSplits, PipelineOptions options) throws Exception {
+ try {
+ long desiredBundleSize = boundedSource.getEstimatedSizeBytes(options) / desiredNumSplits;
+ if (desiredBundleSize <= 0) {
+ LOG.warn("BoundedSource {} cannot estimate its size, skips the initial splits.",
+ boundedSource);
+ return ImmutableList.of(this);
+ }
+ List<? extends BoundedSource<T>> splits =
+ boundedSource.splitIntoBundles(desiredBundleSize, options);
+ if (splits == null) {
+ LOG.warn("BoundedSource cannot split {}, skips the initial splits.", boundedSource);
+ return ImmutableList.of(this);
+ }
+ return Lists.transform(
+ splits,
+ new Function<BoundedSource<T>, BoundedToUnboundedSourceAdapter<T>>() {
+ @Override
+ public BoundedToUnboundedSourceAdapter<T> apply(BoundedSource<T> input) {
+ return new BoundedToUnboundedSourceAdapter<>(input);
+ }});
+ } catch (Exception e) {
+ LOG.warn("Exception while splitting {}, skips the initial splits.", boundedSource, e);
+ return ImmutableList.of(this);
+ }
+ }
+
+ @Override
+ public Reader createReader(PipelineOptions options, Checkpoint<T> checkpoint)
+ throws IOException {
+ if (checkpoint == null) {
+ return new Reader(null /* residualElements */, boundedSource, options);
+ } else {
+ return new Reader(checkpoint.residualElements, checkpoint.residualSource, options);
+ }
+ }
+
+ @Override
+ public Coder<T> getDefaultOutputCoder() {
+ return boundedSource.getDefaultOutputCoder();
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ @Override
+ public Coder<Checkpoint<T>> getCheckpointMarkCoder() {
+ return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder());
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.add(DisplayData.item("source", boundedSource.getClass()));
+ builder.include("source", boundedSource);
+ }
+
+ @VisibleForTesting
+ static class Checkpoint<T> implements UnboundedSource.CheckpointMark {
+ private final @Nullable List<TimestampedValue<T>> residualElements;
+ private final @Nullable BoundedSource<T> residualSource;
+
+ public Checkpoint(
+ @Nullable List<TimestampedValue<T>> residualElements,
+ @Nullable BoundedSource<T> residualSource) {
+ this.residualElements = residualElements;
+ this.residualSource = residualSource;
+ }
+
+ @Override
+ public void finalizeCheckpoint() {}
+
+ @VisibleForTesting
+ @Nullable List<TimestampedValue<T>> getResidualElements() {
+ return residualElements;
+ }
+
+ @VisibleForTesting
+ @Nullable BoundedSource<T> getResidualSource() {
+ return residualSource;
+ }
+ }
+
+ @VisibleForTesting
+ static class CheckpointCoder<T> extends StandardCoder<Checkpoint<T>> {
+
+ @JsonCreator
+ public static CheckpointCoder<?> of(
+ @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
+ List<Coder<?>> components) {
+ checkArgument(components.size() == 1,
+ "Expecting 1 components, got %s", components.size());
+ return new CheckpointCoder<>(components.get(0));
+ }
+
+ // The coder for a list of residual elements and their timestamps
+ private final Coder<List<TimestampedValue<T>>> elemsCoder;
+ // The coder from the BoundedReader for coding each element
+ private final Coder<T> elemCoder;
+ // The nullable and serializable coder for the BoundedSource.
+ @SuppressWarnings("rawtypes")
+ private final Coder<BoundedSource> sourceCoder;
+
+ CheckpointCoder(Coder<T> elemCoder) {
+ this.elemsCoder = NullableCoder.of(
+ ListCoder.of(TimestampedValue.TimestampedValueCoder.of(elemCoder)));
+ this.elemCoder = elemCoder;
+ this.sourceCoder = NullableCoder.of(SerializableCoder.of(BoundedSource.class));
+ }
+
+ @Override
+ public void encode(Checkpoint<T> value, OutputStream outStream, Context context)
+ throws CoderException, IOException {
+ elemsCoder.encode(value.residualElements, outStream, context.nested());
+ sourceCoder.encode(value.residualSource, outStream, context);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Checkpoint<T> decode(InputStream inStream, Context context)
+ throws CoderException, IOException {
+ return new Checkpoint<>(
+ elemsCoder.decode(inStream, context.nested()),
+ sourceCoder.decode(inStream, context));
+ }
+
+ @Override
+ public List<Coder<?>> getCoderArguments() {
+ return Arrays.<Coder<?>>asList(elemCoder);
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ throw new NonDeterministicException(this,
+ "CheckpointCoder uses Java Serialization, which may be non-deterministic.");
+ }
+ }
+
+ /**
+ * An {@code UnboundedReader<T>} that wraps a {@code BoundedSource<T>} into
+ * {@link ResidualElements} and {@link ResidualSource}.
+ *
+ * <p>In the initial state, {@link ResidualElements} is null and {@link ResidualSource} contains
+ * the {@code BoundedSource<T>}. After the first checkpoint, the {@code BoundedSource<T>} will
+ * be split into {@link ResidualElements} and {@link ResidualSource}.
+ */
+ @VisibleForTesting
+ class Reader extends UnboundedReader<T> {
+ private ResidualElements residualElements;
+ private @Nullable ResidualSource residualSource;
+ private final PipelineOptions options;
+ private boolean done;
+
+ Reader(
+ @Nullable List<TimestampedValue<T>> residualElementsList,
+ @Nullable BoundedSource<T> residualSource,
+ PipelineOptions options) {
+ init(residualElementsList, residualSource, options);
+ this.options = checkNotNull(options, "options");
+ this.done = false;
+ }
+
+ private void init(
+ @Nullable List<TimestampedValue<T>> residualElementsList,
+ @Nullable BoundedSource<T> residualSource,
+ PipelineOptions options) {
+ this.residualElements = residualElementsList == null
+ ? new ResidualElements(Collections.<TimestampedValue<T>>emptyList())
+ : new ResidualElements(residualElementsList);
+ this.residualSource =
+ residualSource == null ? null : new ResidualSource(residualSource, options);
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (residualElements.advance()) {
+ return true;
+ } else if (residualSource != null && residualSource.advance()) {
+ return true;
+ } else {
+ done = true;
+ return false;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (residualSource != null) {
+ residualSource.close();
+ }
+ }
+
+ @Override
+ public T getCurrent() throws NoSuchElementException {
+ if (residualElements.hasCurrent()) {
+ return residualElements.getCurrent();
+ } else if (residualSource != null) {
+ return residualSource.getCurrent();
+ } else {
+ throw new NoSuchElementException();
+ }
+ }
+
+ @Override
+ public Instant getCurrentTimestamp() throws NoSuchElementException {
+ if (residualElements.hasCurrent()) {
+ return residualElements.getCurrentTimestamp();
+ } else if (residualSource != null) {
+ return residualSource.getCurrentTimestamp();
+ } else {
+ throw new NoSuchElementException();
+ }
+ }
+
+ @Override
+ public Instant getWatermark() {
+ return done ? BoundedWindow.TIMESTAMP_MAX_VALUE : BoundedWindow.TIMESTAMP_MIN_VALUE;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * <p>If only part of the {@link ResidualElements} is consumed, the new
+ * checkpoint will contain the remaining elements in {@link ResidualElements} and
+ * the {@link ResidualSource}.
+ *
+ * <p>If all {@link ResidualElements} and part of the
+ * {@link ResidualSource} are consumed, the new checkpoint is done by splitting
+ * {@link ResidualSource} into new {@link ResidualElements} and {@link ResidualSource}.
+ * {@link ResidualSource} is the source split from the current source,
+ * and {@link ResidualElements} contains rest elements from the current source after
+ * the splitting. For unsplittable source, it will put all remaining elements into
+ * the {@link ResidualElements}.
+ */
+ @Override
+ public Checkpoint<T> getCheckpointMark() {
+ Checkpoint<T> newCheckpoint;
+ if (!residualElements.done()) {
+ // Part of residualElements are consumed.
+ // Checkpoints the remaining elements and residualSource.
+ newCheckpoint = new Checkpoint<>(
+ residualElements.getRestElements(),
+ residualSource == null ? null : residualSource.getSource());
+ } else if (residualSource != null) {
+ newCheckpoint = residualSource.getCheckpointMark();
+ } else {
+ newCheckpoint = new Checkpoint<>(null /* residualElements */, null /* residualSource */);
+ }
+ // Re-initialize since the residualElements and the residualSource might be
+ // consumed or split by checkpointing.
+ init(newCheckpoint.residualElements, newCheckpoint.residualSource, options);
+ return newCheckpoint;
+ }
+
+ @Override
+ public BoundedToUnboundedSourceAdapter<T> getCurrentSource() {
+ return BoundedToUnboundedSourceAdapter.this;
+ }
+ }
+
+ private class ResidualElements {
+ private final List<TimestampedValue<T>> elementsList;
+ private @Nullable Iterator<TimestampedValue<T>> elementsIterator;
+ private @Nullable TimestampedValue<T> currentT;
+ private boolean hasCurrent;
+ private boolean done;
+
+ ResidualElements(List<TimestampedValue<T>> residualElementsList) {
+ this.elementsList = checkNotNull(residualElementsList, "residualElementsList");
+ this.elementsIterator = null;
+ this.currentT = null;
+ this.hasCurrent = false;
+ this.done = false;
+ }
+
+ public boolean advance() {
+ if (elementsIterator == null) {
+ elementsIterator = elementsList.iterator();
+ }
+ if (elementsIterator.hasNext()) {
+ currentT = elementsIterator.next();
+ hasCurrent = true;
+ return true;
+ } else {
+ done = true;
+ hasCurrent = false;
+ return false;
+ }
+ }
+
+ boolean hasCurrent() {
+ return hasCurrent;
+ }
+
+ boolean done() {
+ return done;
+ }
+
+ TimestampedValue<T> getCurrentTimestampedValue() {
+ if (!hasCurrent) {
+ throw new NoSuchElementException();
+ }
+ return currentT;
+ }
+
+ T getCurrent() {
+ return getCurrentTimestampedValue().getValue();
+ }
+
+ Instant getCurrentTimestamp() {
+ return getCurrentTimestampedValue().getTimestamp();
+ }
+
+ List<TimestampedValue<T>> getRestElements() {
+ if (elementsIterator == null) {
+ return elementsList;
+ } else {
+ List<TimestampedValue<T>> newResidualElements = Lists.newArrayList();
+ while (elementsIterator.hasNext()) {
+ newResidualElements.add(elementsIterator.next());
+ }
+ return newResidualElements;
+ }
+ }
+ }
+
+ private class ResidualSource {
+ private BoundedSource<T> residualSource;
+ private PipelineOptions options;
+ private @Nullable BoundedReader<T> reader;
+ private boolean closed;
+
+ public ResidualSource(BoundedSource<T> residualSource, PipelineOptions options) {
+ this.residualSource = checkNotNull(residualSource, "residualSource");
+ this.options = checkNotNull(options, "options");
+ this.reader = null;
+ this.closed = false;
+ }
+
+ private boolean advance() throws IOException {
+ if (reader == null && !closed) {
+ reader = residualSource.createReader(options);
+ return reader.start();
+ } else {
+ return reader.advance();
+ }
+ }
+
+ T getCurrent() throws NoSuchElementException {
+ if (reader == null) {
+ throw new NoSuchElementException();
+ }
+ return reader.getCurrent();
+ }
+
+ Instant getCurrentTimestamp() throws NoSuchElementException {
+ if (reader == null) {
+ throw new NoSuchElementException();
+ }
+ return reader.getCurrentTimestamp();
+ }
+
+ void close() throws IOException {
+ if (reader != null) {
+ reader.close();
+ reader = null;
+ }
+ closed = true;
+ }
+
+ BoundedSource<T> getSource() {
+ return residualSource;
+ }
+
+ Checkpoint<T> getCheckpointMark() {
+ if (reader == null) {
+ // Reader hasn't started, checkpoint the residualSource.
+ return new Checkpoint<>(null /* residualElements */, residualSource);
+ } else {
+ // Part of residualSource are consumed.
+ // Splits the residualSource and tracks the new residualElements in current source.
+ BoundedSource<T> residualSplit = null;
+ Double fractionConsumed = reader.getFractionConsumed();
+ if (fractionConsumed != null && 0 <= fractionConsumed && fractionConsumed <= 1) {
+ double fractionRest = 1 - fractionConsumed;
+ int splitAttempts = 8;
+ for (int i = 0; i < 8 && residualSplit == null; ++i) {
+ double fractionToSplit = fractionConsumed + fractionRest * i / splitAttempts;
+ residualSplit = reader.splitAtFraction(fractionToSplit);
+ }
+ }
+ List<TimestampedValue<T>> newResidualElements = Lists.newArrayList();
+ try {
+ while (advance()) {
+ newResidualElements.add(
+ TimestampedValue.of(reader.getCurrent(), reader.getCurrentTimestamp()));
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to read elements from the bounded reader.", e);
+ }
+ return new Checkpoint<>(newResidualElements, residualSplit);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
new file mode 100755
index 0000000..ed03b53
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import static org.apache.beam.sdk.util.Structs.addBoolean;
+import static org.apache.beam.sdk.util.Structs.addDictionary;
+import static org.apache.beam.sdk.util.Structs.addLong;
+
+import com.google.api.services.dataflow.model.SourceMetadata;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.runners.dataflow.internal.CustomSources;
+import org.apache.beam.sdk.io.FileBasedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.values.PValue;
+
+/**
+ * Translator for the {@code Read} {@code PTransform} for the Dataflow back-end.
+ */
+class ReadTranslator implements TransformTranslator<Read.Bounded<?>> {
+ @Override
+ public void translate(Read.Bounded<?> transform, TranslationContext context) {
+ translateReadHelper(transform.getSource(), transform, context);
+ }
+
+ public static <T> void translateReadHelper(Source<T> source,
+ PTransform<?, ? extends PValue> transform,
+ TranslationContext context) {
+ try {
+ // TODO: Move this validation out of translation once IOChannelUtils is portable
+ // and can be reconstructed on the worker.
+ if (source instanceof FileBasedSource) {
+ ValueProvider<String> filePatternOrSpec =
+ ((FileBasedSource<?>) source).getFileOrPatternSpecProvider();
+ if (filePatternOrSpec.isAccessible()) {
+ context.getPipelineOptions()
+ .getPathValidator()
+ .validateInputFilePatternSupported(filePatternOrSpec.get());
+ }
+ }
+
+ StepTranslationContext stepContext = context.addStep(transform, "ParallelRead");
+ stepContext.addInput(PropertyNames.FORMAT, PropertyNames.CUSTOM_SOURCE_FORMAT);
+ stepContext.addInput(
+ PropertyNames.SOURCE_STEP_INPUT,
+ cloudSourceToDictionary(
+ CustomSources.serializeToCloudSource(source, context.getPipelineOptions())));
+ stepContext.addValueOnlyOutput(context.getOutput(transform));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ // Represents a cloud Source as a dictionary for encoding inside the {@code SOURCE_STEP_INPUT}
+ // property of CloudWorkflowStep.input.
+ private static Map<String, Object> cloudSourceToDictionary(
+ com.google.api.services.dataflow.model.Source source) {
+ // Do not translate encoding - the source's encoding is translated elsewhere
+ // to the step's output info.
+ Map<String, Object> res = new HashMap<>();
+ addDictionary(res, PropertyNames.SOURCE_SPEC, source.getSpec());
+ if (source.getMetadata() != null) {
+ addDictionary(res, PropertyNames.SOURCE_METADATA,
+ cloudSourceMetadataToDictionary(source.getMetadata()));
+ }
+ if (source.getDoesNotNeedSplitting() != null) {
+ addBoolean(
+ res, PropertyNames.SOURCE_DOES_NOT_NEED_SPLITTING, source.getDoesNotNeedSplitting());
+ }
+ return res;
+ }
+
+ private static Map<String, Object> cloudSourceMetadataToDictionary(SourceMetadata metadata) {
+ Map<String, Object> res = new HashMap<>();
+ if (metadata.getEstimatedSizeBytes() != null) {
+ addLong(res, PropertyNames.SOURCE_ESTIMATED_SIZE_BYTES, metadata.getEstimatedSizeBytes());
+ }
+ if (metadata.getInfinite() != null) {
+ addBoolean(res, PropertyNames.SOURCE_IS_INFINITE, metadata.getInfinite());
+ }
+ return res;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
index 2aa8327..fb883a7 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
@@ -33,7 +33,7 @@ import org.apache.beam.sdk.values.PValue;
* A {@link TransformTranslator} knows how to translate a particular subclass of {@link PTransform}
* for the Cloud Dataflow service. It does so by mutating the {@link TranslationContext}.
*/
-public interface TransformTranslator<TransformT extends PTransform> {
+interface TransformTranslator<TransformT extends PTransform> {
void translate(TransformT transform, TranslationContext context);
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
deleted file mode 100644
index 27fe13d..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.internal;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * A primitive {@link PTransform} that implements the {@link Window#into(WindowFn)}
- * {@link PTransform}.
- *
- * <p>For an application of {@link Window#into(WindowFn)} that changes the {@link WindowFn}, applies
- * a primitive {@link PTransform} in the Dataflow service.
- *
- * <p>For an application of {@link Window#into(WindowFn)} that does not change the {@link WindowFn},
- * applies an identity {@link ParDo} and sets the windowing strategy of the output
- * {@link PCollection}.
- *
- * <p>For internal use only.
- *
- * @param <T> the type of input element
- */
-public class AssignWindows<T> extends PTransform<PCollection<T>, PCollection<T>> {
- private final Window.Bound<T> transform;
-
- /**
- * Builds an instance of this class from the overriden transform.
- */
- @SuppressWarnings("unused") // Used via reflection
- public AssignWindows(Window.Bound<T> transform) {
- this.transform = transform;
- }
-
- @Override
- public PCollection<T> expand(PCollection<T> input) {
- WindowingStrategy<?, ?> outputStrategy =
- transform.getOutputStrategyInternal(input.getWindowingStrategy());
- if (transform.getWindowFn() != null) {
- // If the windowFn changed, we create a primitive, and run the AssignWindows operation here.
- return PCollection.<T>createPrimitiveOutputInternal(
- input.getPipeline(), outputStrategy, input.isBounded());
- } else {
- // If the windowFn didn't change, we just run a pass-through transform and then set the
- // new windowing strategy.
- return input.apply("Identity", ParDo.of(new DoFn<T, T>() {
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- c.output(c.element());
- }
- })).setWindowingStrategyInternal(outputStrategy);
- }
- }
-
- @Override
- public void validate(PCollection<T> input) {
- transform.validate(input);
- }
-
- @Override
- protected Coder<?> getDefaultOutputCoder(PCollection<T> input) {
- return input.getCoder();
- }
-
- @Override
- protected String getKindString() {
- return "Window.Into()";
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowAggregatorTransforms.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowAggregatorTransforms.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowAggregatorTransforms.java
deleted file mode 100755
index fb78973..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowAggregatorTransforms.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.internal;
-
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-
-/**
- * A mapping relating {@link Aggregator}s and the {@link PTransform} in which they are used.
- */
-public class DataflowAggregatorTransforms {
- private final Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorTransforms;
- private final Multimap<PTransform<?, ?>, AppliedPTransform<?, ?, ?>> transformAppliedTransforms;
- private final BiMap<AppliedPTransform<?, ?, ?>, String> appliedStepNames;
-
- public DataflowAggregatorTransforms(
- Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorTransforms,
- Map<AppliedPTransform<?, ?, ?>, String> transformStepNames) {
- this.aggregatorTransforms = aggregatorTransforms;
- appliedStepNames = HashBiMap.create(transformStepNames);
-
- transformAppliedTransforms = HashMultimap.create();
- for (AppliedPTransform<?, ?, ?> appliedTransform : transformStepNames.keySet()) {
- transformAppliedTransforms.put(appliedTransform.getTransform(), appliedTransform);
- }
- }
-
- /**
- * Returns true if the provided {@link Aggregator} is used in the constructing {@link Pipeline}.
- */
- public boolean contains(Aggregator<?, ?> aggregator) {
- return aggregatorTransforms.containsKey(aggregator);
- }
-
- /**
- * Gets the step names in which the {@link Aggregator} is used.
- */
- public Collection<String> getAggregatorStepNames(Aggregator<?, ?> aggregator) {
- Collection<String> names = new HashSet<>();
- Collection<PTransform<?, ?>> transforms = aggregatorTransforms.get(aggregator);
- for (PTransform<?, ?> transform : transforms) {
- for (AppliedPTransform<?, ?, ?> applied : transformAppliedTransforms.get(transform)) {
- names.add(appliedStepNames.get(applied));
- }
- }
- return names;
- }
-
- /**
- * Gets the {@link PTransform} that was assigned the provided step name.
- */
- public AppliedPTransform<?, ?, ?> getAppliedTransformForStepName(String stepName) {
- return appliedStepNames.inverse().get(stepName);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowMetricUpdateExtractor.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowMetricUpdateExtractor.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowMetricUpdateExtractor.java
deleted file mode 100755
index d715437..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowMetricUpdateExtractor.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.internal;
-
-import com.google.api.services.dataflow.model.MetricStructuredName;
-import com.google.api.services.dataflow.model.MetricUpdate;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.PTransform;
-
-/**
- * Methods for extracting the values of an {@link Aggregator} from a collection of {@link
- * MetricUpdate MetricUpdates}.
- */
-public final class DataflowMetricUpdateExtractor {
- private static final String STEP_NAME_CONTEXT_KEY = "step";
- private static final String IS_TENTATIVE_KEY = "tentative";
-
- private DataflowMetricUpdateExtractor() {
- // Do not instantiate.
- }
-
- /**
- * Extract the values of the provided {@link Aggregator} at each {@link PTransform} it was used in
- * according to the provided {@link DataflowAggregatorTransforms} from the given list of {@link
- * MetricUpdate MetricUpdates}.
- */
- public static <OutputT> Map<String, OutputT> fromMetricUpdates(Aggregator<?, OutputT> aggregator,
- DataflowAggregatorTransforms aggregatorTransforms, List<MetricUpdate> metricUpdates) {
- Map<String, OutputT> results = new HashMap<>();
- if (metricUpdates == null) {
- return results;
- }
-
- String aggregatorName = aggregator.getName();
- Collection<String> aggregatorSteps = aggregatorTransforms.getAggregatorStepNames(aggregator);
-
- for (MetricUpdate metricUpdate : metricUpdates) {
- MetricStructuredName metricStructuredName = metricUpdate.getName();
- Map<String, String> context = metricStructuredName.getContext();
- if (metricStructuredName.getName().equals(aggregatorName) && context != null
- && aggregatorSteps.contains(context.get(STEP_NAME_CONTEXT_KEY))) {
- AppliedPTransform<?, ?, ?> transform =
- aggregatorTransforms.getAppliedTransformForStepName(
- context.get(STEP_NAME_CONTEXT_KEY));
- String fullName = transform.getFullName();
- // Prefer the tentative (fresher) value if it exists.
- if (Boolean.parseBoolean(context.get(IS_TENTATIVE_KEY)) || !results.containsKey(fullName)) {
- results.put(fullName, toValue(aggregator, metricUpdate));
- }
- }
- }
-
- return results;
-
- }
-
- private static <OutputT> OutputT toValue(
- Aggregator<?, OutputT> aggregator, MetricUpdate metricUpdate) {
- CombineFn<?, ?, OutputT> combineFn = aggregator.getCombineFn();
- Class<? super OutputT> outputType = combineFn.getOutputType().getRawType();
-
- if (outputType.equals(Long.class)) {
- @SuppressWarnings("unchecked")
- OutputT asLong = (OutputT) Long.valueOf(toNumber(metricUpdate).longValue());
- return asLong;
- }
- if (outputType.equals(Integer.class)) {
- @SuppressWarnings("unchecked")
- OutputT asInt = (OutputT) Integer.valueOf(toNumber(metricUpdate).intValue());
- return asInt;
- }
- if (outputType.equals(Double.class)) {
- @SuppressWarnings("unchecked")
- OutputT asDouble = (OutputT) Double.valueOf(toNumber(metricUpdate).doubleValue());
- return asDouble;
- }
- throw new UnsupportedOperationException(
- "Unsupported Output Type " + outputType + " in aggregator " + aggregator);
- }
-
- private static Number toNumber(MetricUpdate update) {
- if (update.getScalar() instanceof Number) {
- return (Number) update.getScalar();
- }
- throw new IllegalArgumentException(
- "Metric Update " + update + " does not have a numeric scalar");
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
deleted file mode 100644
index a2ae799..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
+++ /dev/null
@@ -1,547 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.internal;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.coders.NullableCoder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.NameUtils;
-import org.apache.beam.sdk.util.PropertyNames;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link PTransform} that converts a {@link BoundedSource} as an {@link UnboundedSource}.
- *
- * <p>{@link BoundedSource} is read directly without calling {@link BoundedSource#splitIntoBundles},
- * and element timestamps are propagated. While any elements remain, the watermark is the beginning
- * of time {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, and after all elements have been produced
- * the watermark goes to the end of time {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
- *
- * <p>Checkpoints are created by calling {@link BoundedReader#splitAtFraction} on inner
- * {@link BoundedSource}.
- * Sources that cannot be split are read entirely into memory, so this transform does not work well
- * with large, unsplittable sources.
- *
- * <p>This transform is intended to be used by a runner during pipeline translation to convert
- * a Read.Bounded into a Read.Unbounded.
- *
- * @deprecated This class is copied from beam runners core in order to avoid pipeline construction
- * time dependency. It should be replaced in the dataflow worker as an execution time dependency.
- */
-@Deprecated
-public class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PCollection<T>> {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(DataflowUnboundedReadFromBoundedSource.class);
-
- private final BoundedSource<T> source;
-
- /**
- * Constructs a {@link PTransform} that performs an unbounded read from a {@link BoundedSource}.
- */
- public DataflowUnboundedReadFromBoundedSource(BoundedSource<T> source) {
- this.source = source;
- }
-
- @Override
- public PCollection<T> expand(PBegin input) {
- return input.getPipeline().apply(
- Read.from(new BoundedToUnboundedSourceAdapter<>(source)));
- }
-
- @Override
- protected Coder<T> getDefaultOutputCoder() {
- return source.getDefaultOutputCoder();
- }
-
- @Override
- public String getKindString() {
- return String.format("Read(%s)", NameUtils.approximateSimpleName(source, "AnonymousSource"));
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- // We explicitly do not register base-class data, instead we use the delegate inner source.
- builder
- .add(DisplayData.item("source", source.getClass()))
- .include("source", source);
- }
-
- /**
- * A {@code BoundedSource} to {@code UnboundedSource} adapter.
- */
- @VisibleForTesting
- public static class BoundedToUnboundedSourceAdapter<T>
- extends UnboundedSource<T, BoundedToUnboundedSourceAdapter.Checkpoint<T>> {
-
- private BoundedSource<T> boundedSource;
-
- public BoundedToUnboundedSourceAdapter(BoundedSource<T> boundedSource) {
- this.boundedSource = boundedSource;
- }
-
- @Override
- public void validate() {
- boundedSource.validate();
- }
-
- @Override
- public List<BoundedToUnboundedSourceAdapter<T>> generateInitialSplits(
- int desiredNumSplits, PipelineOptions options) throws Exception {
- try {
- long desiredBundleSize = boundedSource.getEstimatedSizeBytes(options) / desiredNumSplits;
- if (desiredBundleSize <= 0) {
- LOG.warn("BoundedSource {} cannot estimate its size, skips the initial splits.",
- boundedSource);
- return ImmutableList.of(this);
- }
- List<? extends BoundedSource<T>> splits =
- boundedSource.splitIntoBundles(desiredBundleSize, options);
- if (splits == null) {
- LOG.warn("BoundedSource cannot split {}, skips the initial splits.", boundedSource);
- return ImmutableList.of(this);
- }
- return Lists.transform(
- splits,
- new Function<BoundedSource<T>, BoundedToUnboundedSourceAdapter<T>>() {
- @Override
- public BoundedToUnboundedSourceAdapter<T> apply(BoundedSource<T> input) {
- return new BoundedToUnboundedSourceAdapter<>(input);
- }});
- } catch (Exception e) {
- LOG.warn("Exception while splitting {}, skips the initial splits.", boundedSource, e);
- return ImmutableList.of(this);
- }
- }
-
- @Override
- public Reader createReader(PipelineOptions options, Checkpoint<T> checkpoint)
- throws IOException {
- if (checkpoint == null) {
- return new Reader(null /* residualElements */, boundedSource, options);
- } else {
- return new Reader(checkpoint.residualElements, checkpoint.residualSource, options);
- }
- }
-
- @Override
- public Coder<T> getDefaultOutputCoder() {
- return boundedSource.getDefaultOutputCoder();
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- @Override
- public Coder<Checkpoint<T>> getCheckpointMarkCoder() {
- return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder());
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- builder.add(DisplayData.item("source", boundedSource.getClass()));
- builder.include("source", boundedSource);
- }
-
- @VisibleForTesting
- static class Checkpoint<T> implements UnboundedSource.CheckpointMark {
- private final @Nullable List<TimestampedValue<T>> residualElements;
- private final @Nullable BoundedSource<T> residualSource;
-
- public Checkpoint(
- @Nullable List<TimestampedValue<T>> residualElements,
- @Nullable BoundedSource<T> residualSource) {
- this.residualElements = residualElements;
- this.residualSource = residualSource;
- }
-
- @Override
- public void finalizeCheckpoint() {}
-
- @VisibleForTesting
- @Nullable List<TimestampedValue<T>> getResidualElements() {
- return residualElements;
- }
-
- @VisibleForTesting
- @Nullable BoundedSource<T> getResidualSource() {
- return residualSource;
- }
- }
-
- @VisibleForTesting
- static class CheckpointCoder<T> extends StandardCoder<Checkpoint<T>> {
-
- @JsonCreator
- public static CheckpointCoder<?> of(
- @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
- List<Coder<?>> components) {
- checkArgument(components.size() == 1,
- "Expecting 1 components, got %s", components.size());
- return new CheckpointCoder<>(components.get(0));
- }
-
- // The coder for a list of residual elements and their timestamps
- private final Coder<List<TimestampedValue<T>>> elemsCoder;
- // The coder from the BoundedReader for coding each element
- private final Coder<T> elemCoder;
- // The nullable and serializable coder for the BoundedSource.
- @SuppressWarnings("rawtypes")
- private final Coder<BoundedSource> sourceCoder;
-
- CheckpointCoder(Coder<T> elemCoder) {
- this.elemsCoder = NullableCoder.of(
- ListCoder.of(TimestampedValue.TimestampedValueCoder.of(elemCoder)));
- this.elemCoder = elemCoder;
- this.sourceCoder = NullableCoder.of(SerializableCoder.of(BoundedSource.class));
- }
-
- @Override
- public void encode(Checkpoint<T> value, OutputStream outStream, Context context)
- throws CoderException, IOException {
- elemsCoder.encode(value.residualElements, outStream, context.nested());
- sourceCoder.encode(value.residualSource, outStream, context);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public Checkpoint<T> decode(InputStream inStream, Context context)
- throws CoderException, IOException {
- return new Checkpoint<>(
- elemsCoder.decode(inStream, context.nested()),
- sourceCoder.decode(inStream, context));
- }
-
- @Override
- public List<Coder<?>> getCoderArguments() {
- return Arrays.<Coder<?>>asList(elemCoder);
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- throw new NonDeterministicException(this,
- "CheckpointCoder uses Java Serialization, which may be non-deterministic.");
- }
- }
-
- /**
- * An {@code UnboundedReader<T>} that wraps a {@code BoundedSource<T>} into
- * {@link ResidualElements} and {@link ResidualSource}.
- *
- * <p>In the initial state, {@link ResidualElements} is null and {@link ResidualSource} contains
- * the {@code BoundedSource<T>}. After the first checkpoint, the {@code BoundedSource<T>} will
- * be split into {@link ResidualElements} and {@link ResidualSource}.
- */
- @VisibleForTesting
- class Reader extends UnboundedReader<T> {
- private ResidualElements residualElements;
- private @Nullable ResidualSource residualSource;
- private final PipelineOptions options;
- private boolean done;
-
- Reader(
- @Nullable List<TimestampedValue<T>> residualElementsList,
- @Nullable BoundedSource<T> residualSource,
- PipelineOptions options) {
- init(residualElementsList, residualSource, options);
- this.options = checkNotNull(options, "options");
- this.done = false;
- }
-
- private void init(
- @Nullable List<TimestampedValue<T>> residualElementsList,
- @Nullable BoundedSource<T> residualSource,
- PipelineOptions options) {
- this.residualElements = residualElementsList == null
- ? new ResidualElements(Collections.<TimestampedValue<T>>emptyList())
- : new ResidualElements(residualElementsList);
- this.residualSource =
- residualSource == null ? null : new ResidualSource(residualSource, options);
- }
-
- @Override
- public boolean start() throws IOException {
- return advance();
- }
-
- @Override
- public boolean advance() throws IOException {
- if (residualElements.advance()) {
- return true;
- } else if (residualSource != null && residualSource.advance()) {
- return true;
- } else {
- done = true;
- return false;
- }
- }
-
- @Override
- public void close() throws IOException {
- if (residualSource != null) {
- residualSource.close();
- }
- }
-
- @Override
- public T getCurrent() throws NoSuchElementException {
- if (residualElements.hasCurrent()) {
- return residualElements.getCurrent();
- } else if (residualSource != null) {
- return residualSource.getCurrent();
- } else {
- throw new NoSuchElementException();
- }
- }
-
- @Override
- public Instant getCurrentTimestamp() throws NoSuchElementException {
- if (residualElements.hasCurrent()) {
- return residualElements.getCurrentTimestamp();
- } else if (residualSource != null) {
- return residualSource.getCurrentTimestamp();
- } else {
- throw new NoSuchElementException();
- }
- }
-
- @Override
- public Instant getWatermark() {
- return done ? BoundedWindow.TIMESTAMP_MAX_VALUE : BoundedWindow.TIMESTAMP_MIN_VALUE;
- }
-
- /**
- * {@inheritDoc}
- *
- * <p>If only part of the {@link ResidualElements} is consumed, the new
- * checkpoint will contain the remaining elements in {@link ResidualElements} and
- * the {@link ResidualSource}.
- *
- * <p>If all {@link ResidualElements} and part of the
- * {@link ResidualSource} are consumed, the new checkpoint is done by splitting
- * {@link ResidualSource} into new {@link ResidualElements} and {@link ResidualSource}.
- * {@link ResidualSource} is the source split from the current source,
- * and {@link ResidualElements} contains rest elements from the current source after
- * the splitting. For unsplittable source, it will put all remaining elements into
- * the {@link ResidualElements}.
- */
- @Override
- public Checkpoint<T> getCheckpointMark() {
- Checkpoint<T> newCheckpoint;
- if (!residualElements.done()) {
- // Part of residualElements are consumed.
- // Checkpoints the remaining elements and residualSource.
- newCheckpoint = new Checkpoint<>(
- residualElements.getRestElements(),
- residualSource == null ? null : residualSource.getSource());
- } else if (residualSource != null) {
- newCheckpoint = residualSource.getCheckpointMark();
- } else {
- newCheckpoint = new Checkpoint<>(null /* residualElements */, null /* residualSource */);
- }
- // Re-initialize since the residualElements and the residualSource might be
- // consumed or split by checkpointing.
- init(newCheckpoint.residualElements, newCheckpoint.residualSource, options);
- return newCheckpoint;
- }
-
- @Override
- public BoundedToUnboundedSourceAdapter<T> getCurrentSource() {
- return BoundedToUnboundedSourceAdapter.this;
- }
- }
-
- private class ResidualElements {
- private final List<TimestampedValue<T>> elementsList;
- private @Nullable Iterator<TimestampedValue<T>> elementsIterator;
- private @Nullable TimestampedValue<T> currentT;
- private boolean hasCurrent;
- private boolean done;
-
- ResidualElements(List<TimestampedValue<T>> residualElementsList) {
- this.elementsList = checkNotNull(residualElementsList, "residualElementsList");
- this.elementsIterator = null;
- this.currentT = null;
- this.hasCurrent = false;
- this.done = false;
- }
-
- public boolean advance() {
- if (elementsIterator == null) {
- elementsIterator = elementsList.iterator();
- }
- if (elementsIterator.hasNext()) {
- currentT = elementsIterator.next();
- hasCurrent = true;
- return true;
- } else {
- done = true;
- hasCurrent = false;
- return false;
- }
- }
-
- boolean hasCurrent() {
- return hasCurrent;
- }
-
- boolean done() {
- return done;
- }
-
- TimestampedValue<T> getCurrentTimestampedValue() {
- if (!hasCurrent) {
- throw new NoSuchElementException();
- }
- return currentT;
- }
-
- T getCurrent() {
- return getCurrentTimestampedValue().getValue();
- }
-
- Instant getCurrentTimestamp() {
- return getCurrentTimestampedValue().getTimestamp();
- }
-
- List<TimestampedValue<T>> getRestElements() {
- if (elementsIterator == null) {
- return elementsList;
- } else {
- List<TimestampedValue<T>> newResidualElements = Lists.newArrayList();
- while (elementsIterator.hasNext()) {
- newResidualElements.add(elementsIterator.next());
- }
- return newResidualElements;
- }
- }
- }
-
- private class ResidualSource {
- private BoundedSource<T> residualSource;
- private PipelineOptions options;
- private @Nullable BoundedReader<T> reader;
- private boolean closed;
-
- public ResidualSource(BoundedSource<T> residualSource, PipelineOptions options) {
- this.residualSource = checkNotNull(residualSource, "residualSource");
- this.options = checkNotNull(options, "options");
- this.reader = null;
- this.closed = false;
- }
-
- private boolean advance() throws IOException {
- if (reader == null && !closed) {
- reader = residualSource.createReader(options);
- return reader.start();
- } else {
- return reader.advance();
- }
- }
-
- T getCurrent() throws NoSuchElementException {
- if (reader == null) {
- throw new NoSuchElementException();
- }
- return reader.getCurrent();
- }
-
- Instant getCurrentTimestamp() throws NoSuchElementException {
- if (reader == null) {
- throw new NoSuchElementException();
- }
- return reader.getCurrentTimestamp();
- }
-
- void close() throws IOException {
- if (reader != null) {
- reader.close();
- reader = null;
- }
- closed = true;
- }
-
- BoundedSource<T> getSource() {
- return residualSource;
- }
-
- Checkpoint<T> getCheckpointMark() {
- if (reader == null) {
- // Reader hasn't started, checkpoint the residualSource.
- return new Checkpoint<>(null /* residualElements */, residualSource);
- } else {
- // Part of residualSource are consumed.
- // Splits the residualSource and tracks the new residualElements in current source.
- BoundedSource<T> residualSplit = null;
- Double fractionConsumed = reader.getFractionConsumed();
- if (fractionConsumed != null && 0 <= fractionConsumed && fractionConsumed <= 1) {
- double fractionRest = 1 - fractionConsumed;
- int splitAttempts = 8;
- for (int i = 0; i < 8 && residualSplit == null; ++i) {
- double fractionToSplit = fractionConsumed + fractionRest * i / splitAttempts;
- residualSplit = reader.splitAtFraction(fractionToSplit);
- }
- }
- List<TimestampedValue<T>> newResidualElements = Lists.newArrayList();
- try {
- while (advance()) {
- newResidualElements.add(
- TimestampedValue.of(reader.getCurrent(), reader.getCurrentTimestamp()));
- }
- } catch (IOException e) {
- throw new RuntimeException("Failed to read elements from the bounded reader.", e);
- }
- return new Checkpoint<>(newResidualElements, residualSplit);
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
deleted file mode 100755
index a15a2a3..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.internal;
-
-import static org.apache.beam.sdk.util.Structs.addBoolean;
-import static org.apache.beam.sdk.util.Structs.addDictionary;
-import static org.apache.beam.sdk.util.Structs.addLong;
-
-import com.google.api.services.dataflow.model.SourceMetadata;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.beam.runners.dataflow.TransformTranslator;
-import org.apache.beam.sdk.io.FileBasedSource;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.Source;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.PropertyNames;
-import org.apache.beam.sdk.values.PValue;
-
-/**
- * Translator for the {@code Read} {@code PTransform} for the Dataflow back-end.
- */
-public class ReadTranslator implements TransformTranslator<Read.Bounded<?>> {
- @Override
- public void translate(Read.Bounded<?> transform, TranslationContext context) {
- translateReadHelper(transform.getSource(), transform, context);
- }
-
- public static <T> void translateReadHelper(Source<T> source,
- PTransform<?, ? extends PValue> transform,
- TranslationContext context) {
- try {
- // TODO: Move this validation out of translation once IOChannelUtils is portable
- // and can be reconstructed on the worker.
- if (source instanceof FileBasedSource) {
- ValueProvider<String> filePatternOrSpec =
- ((FileBasedSource<?>) source).getFileOrPatternSpecProvider();
- if (filePatternOrSpec.isAccessible()) {
- context.getPipelineOptions()
- .getPathValidator()
- .validateInputFilePatternSupported(filePatternOrSpec.get());
- }
- }
-
- StepTranslationContext stepContext = context.addStep(transform, "ParallelRead");
- stepContext.addInput(PropertyNames.FORMAT, PropertyNames.CUSTOM_SOURCE_FORMAT);
- stepContext.addInput(
- PropertyNames.SOURCE_STEP_INPUT,
- cloudSourceToDictionary(
- CustomSources.serializeToCloudSource(source, context.getPipelineOptions())));
- stepContext.addValueOnlyOutput(context.getOutput(transform));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- // Represents a cloud Source as a dictionary for encoding inside the {@code SOURCE_STEP_INPUT}
- // property of CloudWorkflowStep.input.
- private static Map<String, Object> cloudSourceToDictionary(
- com.google.api.services.dataflow.model.Source source) {
- // Do not translate encoding - the source's encoding is translated elsewhere
- // to the step's output info.
- Map<String, Object> res = new HashMap<>();
- addDictionary(res, PropertyNames.SOURCE_SPEC, source.getSpec());
- if (source.getMetadata() != null) {
- addDictionary(res, PropertyNames.SOURCE_METADATA,
- cloudSourceMetadataToDictionary(source.getMetadata()));
- }
- if (source.getDoesNotNeedSplitting() != null) {
- addBoolean(
- res, PropertyNames.SOURCE_DOES_NOT_NEED_SPLITTING, source.getDoesNotNeedSplitting());
- }
- return res;
- }
-
- private static Map<String, Object> cloudSourceMetadataToDictionary(SourceMetadata metadata) {
- Map<String, Object> res = new HashMap<>();
- if (metadata.getEstimatedSizeBytes() != null) {
- addLong(res, PropertyNames.SOURCE_ESTIMATED_SIZE_BYTES, metadata.getEstimatedSizeBytes());
- }
- if (metadata.getInfinite() != null) {
- addBoolean(res, PropertyNames.SOURCE_IS_INFINITE, metadata.getInfinite());
- }
- return res;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 6999e03..d5d7aa9 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -54,7 +54,6 @@ import java.math.BigDecimal;
import java.net.SocketTimeoutException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
-import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms;
import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
import org.apache.beam.sdk.AggregatorRetrievalException;