You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:56 UTC
[32/67] [partial] incubator-beam git commit: Directory reorganization
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/PubsubIOTranslator.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/PubsubIOTranslator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/PubsubIOTranslator.java
deleted file mode 100644
index 8b066ab..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/PubsubIOTranslator.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners.dataflow;
-
-import com.google.cloud.dataflow.sdk.io.PubsubIO;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TransformTranslator;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TranslationContext;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-
-/**
- * Pubsub transform support code for the Dataflow backend.
- */
-public class PubsubIOTranslator {
-
- /**
- * Implements PubsubIO Read translation for the Dataflow backend.
- */
- public static class ReadTranslator<T> implements TransformTranslator<PubsubIO.Read.Bound<T>> {
- @Override
- @SuppressWarnings({"rawtypes", "unchecked"})
- public void translate(
- PubsubIO.Read.Bound transform,
- TranslationContext context) {
- translateReadHelper(transform, context);
- }
-
- private <T> void translateReadHelper(
- PubsubIO.Read.Bound<T> transform,
- TranslationContext context) {
- if (!context.getPipelineOptions().isStreaming()) {
- throw new IllegalArgumentException(
- "PubsubIO.Read can only be used with the Dataflow streaming runner.");
- }
-
- context.addStep(transform, "ParallelRead");
- context.addInput(PropertyNames.FORMAT, "pubsub");
- if (transform.getTopic() != null) {
- context.addInput(PropertyNames.PUBSUB_TOPIC, transform.getTopic().asV1Beta1Path());
- }
- if (transform.getSubscription() != null) {
- context.addInput(
- PropertyNames.PUBSUB_SUBSCRIPTION, transform.getSubscription().asV1Beta1Path());
- }
- if (transform.getTimestampLabel() != null) {
- context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL, transform.getTimestampLabel());
- }
- if (transform.getIdLabel() != null) {
- context.addInput(PropertyNames.PUBSUB_ID_LABEL, transform.getIdLabel());
- }
- context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform));
- }
- }
-
- /**
- * Implements PubsubIO Write translation for the Dataflow backend.
- */
- public static class WriteTranslator<T>
- implements TransformTranslator<DataflowPipelineRunner.StreamingPubsubIOWrite<T>> {
-
- @Override
- @SuppressWarnings({"rawtypes", "unchecked"})
- public void translate(
- DataflowPipelineRunner.StreamingPubsubIOWrite transform,
- TranslationContext context) {
- translateWriteHelper(transform, context);
- }
-
- private <T> void translateWriteHelper(
- DataflowPipelineRunner.StreamingPubsubIOWrite<T> customTransform,
- TranslationContext context) {
- if (!context.getPipelineOptions().isStreaming()) {
- throw new IllegalArgumentException(
- "PubsubIO.Write is non-primitive for the Dataflow batch runner.");
- }
-
- PubsubIO.Write.Bound<T> transform = customTransform.getOverriddenTransform();
-
- context.addStep(customTransform, "ParallelWrite");
- context.addInput(PropertyNames.FORMAT, "pubsub");
- context.addInput(PropertyNames.PUBSUB_TOPIC, transform.getTopic().asV1Beta1Path());
- if (transform.getTimestampLabel() != null) {
- context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL, transform.getTimestampLabel());
- }
- if (transform.getIdLabel() != null) {
- context.addInput(PropertyNames.PUBSUB_ID_LABEL, transform.getIdLabel());
- }
- context.addEncodingInput(WindowedValue.getValueOnlyCoder(transform.getCoder()));
- context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(customTransform));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/ReadTranslator.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/ReadTranslator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/ReadTranslator.java
deleted file mode 100644
index f110e84..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/ReadTranslator.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners.dataflow;
-
-import static com.google.cloud.dataflow.sdk.util.Structs.addBoolean;
-import static com.google.cloud.dataflow.sdk.util.Structs.addDictionary;
-import static com.google.cloud.dataflow.sdk.util.Structs.addLong;
-
-import com.google.api.services.dataflow.model.SourceMetadata;
-import com.google.cloud.dataflow.sdk.io.FileBasedSource;
-import com.google.cloud.dataflow.sdk.io.Read;
-import com.google.cloud.dataflow.sdk.io.Source;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TransformTranslator;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TranslationContext;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.values.PValue;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Translator for the {@code Read} {@code PTransform} for the Dataflow back-end.
- */
-public class ReadTranslator implements TransformTranslator<Read.Bounded<?>> {
- @Override
- public void translate(Read.Bounded<?> transform, TranslationContext context) {
- translateReadHelper(transform.getSource(), transform, context);
- }
-
- public static <T> void translateReadHelper(Source<T> source,
- PTransform<?, ? extends PValue> transform,
- DataflowPipelineTranslator.TranslationContext context) {
- try {
- // TODO: Move this validation out of translation once IOChannelUtils is portable
- // and can be reconstructed on the worker.
- if (source instanceof FileBasedSource) {
- String filePatternOrSpec = ((FileBasedSource<?>) source).getFileOrPatternSpec();
- context.getPipelineOptions()
- .getPathValidator()
- .validateInputFilePatternSupported(filePatternOrSpec);
- }
-
- context.addStep(transform, "ParallelRead");
- context.addInput(PropertyNames.FORMAT, PropertyNames.CUSTOM_SOURCE_FORMAT);
- context.addInput(
- PropertyNames.SOURCE_STEP_INPUT,
- cloudSourceToDictionary(
- CustomSources.serializeToCloudSource(source, context.getPipelineOptions())));
- context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- // Represents a cloud Source as a dictionary for encoding inside the {@code SOURCE_STEP_INPUT}
- // property of CloudWorkflowStep.input.
- private static Map<String, Object> cloudSourceToDictionary(
- com.google.api.services.dataflow.model.Source source) {
- // Do not translate encoding - the source's encoding is translated elsewhere
- // to the step's output info.
- Map<String, Object> res = new HashMap<>();
- addDictionary(res, PropertyNames.SOURCE_SPEC, source.getSpec());
- if (source.getMetadata() != null) {
- addDictionary(res, PropertyNames.SOURCE_METADATA,
- cloudSourceMetadataToDictionary(source.getMetadata()));
- }
- if (source.getDoesNotNeedSplitting() != null) {
- addBoolean(
- res, PropertyNames.SOURCE_DOES_NOT_NEED_SPLITTING, source.getDoesNotNeedSplitting());
- }
- return res;
- }
-
- private static Map<String, Object> cloudSourceMetadataToDictionary(SourceMetadata metadata) {
- Map<String, Object> res = new HashMap<>();
- if (metadata.getProducesSortedKeys() != null) {
- addBoolean(res, PropertyNames.SOURCE_PRODUCES_SORTED_KEYS, metadata.getProducesSortedKeys());
- }
- if (metadata.getEstimatedSizeBytes() != null) {
- addLong(res, PropertyNames.SOURCE_ESTIMATED_SIZE_BYTES, metadata.getEstimatedSizeBytes());
- }
- if (metadata.getInfinite() != null) {
- addBoolean(res, PropertyNames.SOURCE_IS_INFINITE, metadata.getInfinite());
- }
- return res;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/package-info.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/package-info.java
deleted file mode 100644
index b6b2ce6..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/package-info.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * Implementation of the {@link com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner}.
- */
-package com.google.cloud.dataflow.sdk.runners.dataflow;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
deleted file mode 100644
index eaea3ed..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.io.BoundedSource;
-import com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader;
-import com.google.cloud.dataflow.sdk.io.Read.Bounded;
-import com.google.cloud.dataflow.sdk.io.Source.Reader;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import java.io.IOException;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator TransformEvaluators}
- * for the {@link Bounded Read.Bounded} primitive {@link PTransform}.
- */
-final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
- /*
- * An evaluator for a Source is stateful, to ensure data is not read multiple times.
- * Evaluators are cached here to ensure that the reader is not restarted if the evaluator is
- * retriggered.
- */
- private final ConcurrentMap<EvaluatorKey, Queue<? extends BoundedReadEvaluator<?>>>
- sourceEvaluators = new ConcurrentHashMap<>();
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- @Override
- public <InputT> TransformEvaluator<InputT> forApplication(
- AppliedPTransform<?, ?, ?> application,
- @Nullable CommittedBundle<?> inputBundle,
- InProcessEvaluationContext evaluationContext)
- throws IOException {
- return getTransformEvaluator((AppliedPTransform) application, evaluationContext);
- }
-
- private <OutputT> TransformEvaluator<?> getTransformEvaluator(
- final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform,
- final InProcessEvaluationContext evaluationContext)
- throws IOException {
- BoundedReadEvaluator<?> evaluator =
- getTransformEvaluatorQueue(transform, evaluationContext).poll();
- if (evaluator == null) {
- return EmptyTransformEvaluator.create(transform);
- }
- return evaluator;
- }
-
- /**
- * Get the queue of {@link TransformEvaluator TransformEvaluators} that produce elements for the
- * provided application of {@link Bounded Read.Bounded}, initializing it if required.
- *
- * <p>This method is thread-safe, and will only produce new evaluators if no other invocation has
- * already done so.
- */
- @SuppressWarnings("unchecked")
- private <OutputT> Queue<BoundedReadEvaluator<OutputT>> getTransformEvaluatorQueue(
- final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform,
- final InProcessEvaluationContext evaluationContext) {
- // Key by the application and the context the evaluation is occurring in (which call to
- // Pipeline#run).
- EvaluatorKey key = new EvaluatorKey(transform, evaluationContext);
- Queue<BoundedReadEvaluator<OutputT>> evaluatorQueue =
- (Queue<BoundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
- if (evaluatorQueue == null) {
- evaluatorQueue = new ConcurrentLinkedQueue<>();
- if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) {
- // If no queue existed in the evaluators, add an evaluator to initialize the evaluator
- // factory for this transform
- BoundedReadEvaluator<OutputT> evaluator =
- new BoundedReadEvaluator<OutputT>(transform, evaluationContext);
- evaluatorQueue.offer(evaluator);
- } else {
- // otherwise return the existing Queue that arrived before us
- evaluatorQueue = (Queue<BoundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
- }
- }
- return evaluatorQueue;
- }
-
- /**
- * A {@link BoundedReadEvaluator} produces elements from an underlying {@link BoundedSource},
- * discarding all input elements. Within the call to {@link #finishBundle()}, the evaluator
- * creates the {@link BoundedReader} and consumes all available input.
- *
- * <p>A {@link BoundedReadEvaluator} should only be created once per {@link BoundedSource}, and
- * each evaluator should only be called once per evaluation of the pipeline. Otherwise, the source
- * may produce duplicate elements.
- */
- private static class BoundedReadEvaluator<OutputT> implements TransformEvaluator<Object> {
- private final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform;
- private final InProcessEvaluationContext evaluationContext;
- private boolean contentsRemaining;
-
- public BoundedReadEvaluator(
- AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform,
- InProcessEvaluationContext evaluationContext) {
- this.transform = transform;
- this.evaluationContext = evaluationContext;
- }
-
- @Override
- public void processElement(WindowedValue<Object> element) {}
-
- @Override
- public InProcessTransformResult finishBundle() throws IOException {
- try (final Reader<OutputT> reader =
- transform
- .getTransform()
- .getSource()
- .createReader(evaluationContext.getPipelineOptions());) {
- contentsRemaining = reader.start();
- UncommittedBundle<OutputT> output =
- evaluationContext.createRootBundle(transform.getOutput());
- while (contentsRemaining) {
- output.add(
- WindowedValue.timestampedValueInGlobalWindow(
- reader.getCurrent(), reader.getCurrentTimestamp()));
- contentsRemaining = reader.advance();
- }
- reader.close();
- return StepTransformResult.withHold(transform, BoundedWindow.TIMESTAMP_MAX_VALUE)
- .addOutput(output)
- .build();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CachedThreadPoolExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CachedThreadPoolExecutorServiceFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CachedThreadPoolExecutorServiceFactory.java
deleted file mode 100644
index 3350d2b..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CachedThreadPoolExecutorServiceFactory.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/**
- * A {@link ExecutorServiceFactory} that produces cached thread pools via
- * {@link Executors#newCachedThreadPool()}.
- */
-class CachedThreadPoolExecutorServiceFactory
- implements DefaultValueFactory<ExecutorServiceFactory>, ExecutorServiceFactory {
- private static final CachedThreadPoolExecutorServiceFactory INSTANCE =
- new CachedThreadPoolExecutorServiceFactory();
-
- @Override
- public ExecutorServiceFactory create(PipelineOptions options) {
- return INSTANCE;
- }
-
- @Override
- public ExecutorService create() {
- return Executors.newCachedThreadPool();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/Clock.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/Clock.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/Clock.java
deleted file mode 100644
index 11e6ec1..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/Clock.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import org.joda.time.Instant;
-
-/**
- * Access to the current time.
- */
-public interface Clock {
- /**
- * Returns the current time as an {@link Instant}.
- */
- Instant now();
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CompletionCallback.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CompletionCallback.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CompletionCallback.java
deleted file mode 100644
index 2792631..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CompletionCallback.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-
-/**
- * A callback for completing a bundle of input.
- */
-interface CompletionCallback {
- /**
- * Handle a successful result.
- */
- void handleResult(CommittedBundle<?> inputBundle, InProcessTransformResult result);
-
- /**
- * Handle a result that terminated abnormally due to the provided {@link Throwable}.
- */
- void handleThrowable(CommittedBundle<?> inputBundle, Throwable t);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java
deleted file mode 100644
index c602b23..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.Pipeline.PipelineVisitor;
-import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.PValue;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Tracks the {@link AppliedPTransform AppliedPTransforms} that consume each {@link PValue} in the
- * {@link Pipeline}. This is used to schedule consuming {@link PTransform PTransforms} to consume
- * input after the upstream transform has produced and committed output.
- */
-public class ConsumerTrackingPipelineVisitor implements PipelineVisitor {
- private Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers = new HashMap<>();
- private Collection<AppliedPTransform<?, ?, ?>> rootTransforms = new ArrayList<>();
- private Collection<PCollectionView<?>> views = new ArrayList<>();
- private Map<AppliedPTransform<?, ?, ?>, String> stepNames = new HashMap<>();
- private Set<PValue> toFinalize = new HashSet<>();
- private int numTransforms = 0;
- private boolean finalized = false;
-
- @Override
- public void enterCompositeTransform(TransformTreeNode node) {
- checkState(
- !finalized,
- "Attempting to traverse a pipeline (node %s) with a %s "
- + "which has already visited a Pipeline and is finalized",
- node.getFullName(),
- ConsumerTrackingPipelineVisitor.class.getSimpleName());
- }
-
- @Override
- public void leaveCompositeTransform(TransformTreeNode node) {
- checkState(
- !finalized,
- "Attempting to traverse a pipeline (node %s) with a %s which is already finalized",
- node.getFullName(),
- ConsumerTrackingPipelineVisitor.class.getSimpleName());
- if (node.isRootNode()) {
- finalized = true;
- }
- }
-
- @Override
- public void visitTransform(TransformTreeNode node) {
- toFinalize.removeAll(node.getInput().expand());
- AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(node);
- if (node.getInput().expand().isEmpty()) {
- rootTransforms.add(appliedTransform);
- } else {
- for (PValue value : node.getInput().expand()) {
- valueToConsumers.get(value).add(appliedTransform);
- stepNames.put(appliedTransform, genStepName());
- }
- }
- }
-
- private AppliedPTransform<?, ?, ?> getAppliedTransform(TransformTreeNode node) {
- @SuppressWarnings({"rawtypes", "unchecked"})
- AppliedPTransform<?, ?, ?> application = AppliedPTransform.of(
- node.getFullName(), node.getInput(), node.getOutput(), (PTransform) node.getTransform());
- return application;
- }
-
- @Override
- public void visitValue(PValue value, TransformTreeNode producer) {
- toFinalize.add(value);
- for (PValue expandedValue : value.expand()) {
- valueToConsumers.put(expandedValue, new ArrayList<AppliedPTransform<?, ?, ?>>());
- if (expandedValue instanceof PCollectionView) {
- views.add((PCollectionView<?>) expandedValue);
- }
- expandedValue.recordAsOutput(getAppliedTransform(producer));
- }
- value.recordAsOutput(getAppliedTransform(producer));
- }
-
- private String genStepName() {
- return String.format("s%s", numTransforms++);
- }
-
-
- /**
- * Returns a mapping of each fully-expanded {@link PValue} to each
- * {@link AppliedPTransform} that consumes it. For each AppliedPTransform in the collection
- * returned from {@code getValueToCustomers().get(PValue)},
- * {@code AppliedPTransform#getInput().expand()} will contain the argument {@link PValue}.
- */
- public Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> getValueToConsumers() {
- checkState(
- finalized,
- "Can't call getValueToConsumers before the Pipeline has been completely traversed");
-
- return valueToConsumers;
- }
-
- /**
- * Returns the mapping for each {@link AppliedPTransform} in the {@link Pipeline} to a unique step
- * name.
- */
- public Map<AppliedPTransform<?, ?, ?>, String> getStepNames() {
- checkState(
- finalized, "Can't call getStepNames before the Pipeline has been completely traversed");
-
- return stepNames;
- }
-
- /**
- * Returns the root transforms of the {@link Pipeline}. A root {@link AppliedPTransform} consumes
- * a {@link PInput} where the {@link PInput#expand()} returns an empty collection.
- */
- public Collection<AppliedPTransform<?, ?, ?>> getRootTransforms() {
- checkState(
- finalized,
- "Can't call getRootTransforms before the Pipeline has been completely traversed");
-
- return rootTransforms;
- }
-
- /**
- * Returns all of the {@link PCollectionView PCollectionViews} contained in the visited
- * {@link Pipeline}.
- */
- public Collection<PCollectionView<?>> getViews() {
- checkState(finalized, "Can't call getViews before the Pipeline has been completely traversed");
-
- return views;
- }
-
- /**
- * Returns all of the {@link PValue PValues} that have been produced but not consumed. These
- * {@link PValue PValues} should be finalized by the {@link PipelineRunner} before the
- * {@link Pipeline} is executed.
- */
- public Set<PValue> getUnfinalizedPValues() {
- checkState(
- finalized,
- "Can't call getUnfinalizedPValues before the Pipeline has been completely traversed");
-
- return toFinalize;
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EmptyTransformEvaluator.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EmptyTransformEvaluator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EmptyTransformEvaluator.java
deleted file mode 100644
index fc09237..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EmptyTransformEvaluator.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-
-/**
- * A {@link TransformEvaluator} that ignores all input and produces no output. The result of
- * invoking {@link #finishBundle()} on this evaluator is to return an
- * {@link InProcessTransformResult} with no elements and a timestamp hold equal to
- * {@link BoundedWindow#TIMESTAMP_MIN_VALUE}. Because the result contains no elements, this hold
- * will not affect the watermark.
- */
-final class EmptyTransformEvaluator<T> implements TransformEvaluator<T> {
- public static <T> TransformEvaluator<T> create(AppliedPTransform<?, ?, ?> transform) {
- return new EmptyTransformEvaluator<T>(transform);
- }
-
- private final AppliedPTransform<?, ?, ?> transform;
-
- private EmptyTransformEvaluator(AppliedPTransform<?, ?, ?> transform) {
- this.transform = transform;
- }
-
- @Override
- public void processElement(WindowedValue<T> element) throws Exception {}
-
- @Override
- public InProcessTransformResult finishBundle() throws Exception {
- return StepTransformResult.withHold(transform, BoundedWindow.TIMESTAMP_MIN_VALUE)
- .build();
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EvaluatorKey.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EvaluatorKey.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EvaluatorKey.java
deleted file mode 100644
index 307bc5c..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EvaluatorKey.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-
-import java.util.Objects;
-
-/**
- * A (Transform, Pipeline Execution) key for stateful evaluators.
- *
- * Source evaluators are stateful to ensure data is not read multiple times. Evaluators are cached
- * to ensure that the reader is not restarted if the evaluator is retriggered. An
- * {@link EvaluatorKey} is used to ensure that multiple Pipelines can be executed without sharing
- * the same evaluators.
- */
-final class EvaluatorKey {
- private final AppliedPTransform<?, ?, ?> transform;
- private final InProcessEvaluationContext context;
-
- public EvaluatorKey(AppliedPTransform<?, ?, ?> transform, InProcessEvaluationContext context) {
- this.transform = transform;
- this.context = context;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(transform, context);
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null || !(other instanceof EvaluatorKey)) {
- return false;
- }
- EvaluatorKey that = (EvaluatorKey) other;
- return Objects.equals(this.transform, that.transform)
- && Objects.equals(this.context, that.context);
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceFactory.java
deleted file mode 100644
index 480bcde..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceFactory.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import java.util.concurrent.ExecutorService;
-
-/**
- * A factory that creates {@link ExecutorService ExecutorServices}.
- * {@link ExecutorService ExecutorServices} created by this factory should be independent of one
- * another (e.g., if any executor is shut down the remaining executors should continue to process
- * work).
- */
-public interface ExecutorServiceFactory {
- /**
- * Create a new {@link ExecutorService}.
- */
- ExecutorService create();
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
deleted file mode 100644
index 68a1b8c..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
+++ /dev/null
@@ -1,432 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.FiredTimers;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.util.KeyedWorkItem;
-import com.google.cloud.dataflow.sdk.util.KeyedWorkItems;
-import com.google.cloud.dataflow.sdk.util.TimeDomain;
-import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PValue;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-
-import javax.annotation.Nullable;
-
-/**
- * An {@link InProcessExecutor} that uses an underlying {@link ExecutorService} and
- * {@link InProcessEvaluationContext} to execute a {@link Pipeline}.
- */
-final class ExecutorServiceParallelExecutor implements InProcessExecutor {
- private static final Logger LOG = LoggerFactory.getLogger(ExecutorServiceParallelExecutor.class);
-
- private final ExecutorService executorService;
-
- private final Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers;
- private final Set<PValue> keyedPValues;
- private final TransformEvaluatorRegistry registry;
- private final InProcessEvaluationContext evaluationContext;
-
- private final ConcurrentMap<StepAndKey, TransformExecutorService> currentEvaluations;
- private final ConcurrentMap<TransformExecutor<?>, Boolean> scheduledExecutors;
-
- private final Queue<ExecutorUpdate> allUpdates;
- private final BlockingQueue<VisibleExecutorUpdate> visibleUpdates;
-
- private final TransformExecutorService parallelExecutorService;
- private final CompletionCallback defaultCompletionCallback;
-
- private Collection<AppliedPTransform<?, ?, ?>> rootNodes;
-
- public static ExecutorServiceParallelExecutor create(
- ExecutorService executorService,
- Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
- Set<PValue> keyedPValues,
- TransformEvaluatorRegistry registry,
- InProcessEvaluationContext context) {
- return new ExecutorServiceParallelExecutor(
- executorService, valueToConsumers, keyedPValues, registry, context);
- }
-
- private ExecutorServiceParallelExecutor(
- ExecutorService executorService,
- Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
- Set<PValue> keyedPValues,
- TransformEvaluatorRegistry registry,
- InProcessEvaluationContext context) {
- this.executorService = executorService;
- this.valueToConsumers = valueToConsumers;
- this.keyedPValues = keyedPValues;
- this.registry = registry;
- this.evaluationContext = context;
-
- currentEvaluations = new ConcurrentHashMap<>();
- scheduledExecutors = new ConcurrentHashMap<>();
-
- this.allUpdates = new ConcurrentLinkedQueue<>();
- this.visibleUpdates = new ArrayBlockingQueue<>(20);
-
- parallelExecutorService =
- TransformExecutorServices.parallel(executorService, scheduledExecutors);
- defaultCompletionCallback = new DefaultCompletionCallback();
- }
-
- @Override
- public void start(Collection<AppliedPTransform<?, ?, ?>> roots) {
- rootNodes = ImmutableList.copyOf(roots);
- Runnable monitorRunnable = new MonitorRunnable();
- executorService.submit(monitorRunnable);
- }
-
- @SuppressWarnings("unchecked")
- public void scheduleConsumption(
- AppliedPTransform<?, ?, ?> consumer,
- @Nullable CommittedBundle<?> bundle,
- CompletionCallback onComplete) {
- evaluateBundle(consumer, bundle, onComplete);
- }
-
- private <T> void evaluateBundle(
- final AppliedPTransform<?, ?, ?> transform,
- @Nullable final CommittedBundle<T> bundle,
- final CompletionCallback onComplete) {
- TransformExecutorService transformExecutor;
- if (bundle != null && isKeyed(bundle.getPCollection())) {
- final StepAndKey stepAndKey =
- StepAndKey.of(transform, bundle == null ? null : bundle.getKey());
- transformExecutor = getSerialExecutorService(stepAndKey);
- } else {
- transformExecutor = parallelExecutorService;
- }
- TransformExecutor<T> callable =
- TransformExecutor.create(
- registry, evaluationContext, bundle, transform, onComplete, transformExecutor);
- transformExecutor.schedule(callable);
- }
-
- private boolean isKeyed(PValue pvalue) {
- return keyedPValues.contains(pvalue);
- }
-
- private void scheduleConsumers(CommittedBundle<?> bundle) {
- for (AppliedPTransform<?, ?, ?> consumer : valueToConsumers.get(bundle.getPCollection())) {
- scheduleConsumption(consumer, bundle, defaultCompletionCallback);
- }
- }
-
- private TransformExecutorService getSerialExecutorService(StepAndKey stepAndKey) {
- if (!currentEvaluations.containsKey(stepAndKey)) {
- currentEvaluations.putIfAbsent(
- stepAndKey, TransformExecutorServices.serial(executorService, scheduledExecutors));
- }
- return currentEvaluations.get(stepAndKey);
- }
-
- @Override
- public void awaitCompletion() throws Throwable {
- VisibleExecutorUpdate update;
- do {
- update = visibleUpdates.take();
- if (update.throwable.isPresent()) {
- throw update.throwable.get();
- }
- } while (!update.isDone());
- executorService.shutdown();
- }
-
- /**
- * The default {@link CompletionCallback}. The default completion callback is used to complete
- * transform evaluations that are triggered due to the arrival of elements from an upstream
- * transform, or for a source transform.
- */
- private class DefaultCompletionCallback implements CompletionCallback {
- @Override
- public void handleResult(CommittedBundle<?> inputBundle, InProcessTransformResult result) {
- Iterable<? extends CommittedBundle<?>> resultBundles =
- evaluationContext.handleResult(inputBundle, Collections.<TimerData>emptyList(), result);
- for (CommittedBundle<?> outputBundle : resultBundles) {
- allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle));
- }
- }
-
- @Override
- public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) {
- allUpdates.offer(ExecutorUpdate.fromThrowable(t));
- }
- }
-
- /**
- * A {@link CompletionCallback} where the completed bundle was produced to deliver some collection
- * of {@link TimerData timers}. When the evaluator completes successfully, reports all of the
- * timers used to create the input to the {@link InProcessEvaluationContext evaluation context}
- * as part of the result.
- */
- private class TimerCompletionCallback implements CompletionCallback {
- private final Iterable<TimerData> timers;
-
- private TimerCompletionCallback(Iterable<TimerData> timers) {
- this.timers = timers;
- }
-
- @Override
- public void handleResult(CommittedBundle<?> inputBundle, InProcessTransformResult result) {
- Iterable<? extends CommittedBundle<?>> resultBundles =
- evaluationContext.handleResult(inputBundle, timers, result);
- for (CommittedBundle<?> outputBundle : resultBundles) {
- allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle));
- }
- }
-
- @Override
- public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) {
- allUpdates.offer(ExecutorUpdate.fromThrowable(t));
- }
- }
-
- /**
- * An internal status update on the state of the executor.
- *
- * Used to signal when the executor should be shut down (due to an exception).
- */
- private static class ExecutorUpdate {
- private final Optional<? extends CommittedBundle<?>> bundle;
- private final Optional<? extends Throwable> throwable;
-
- public static ExecutorUpdate fromBundle(CommittedBundle<?> bundle) {
- return new ExecutorUpdate(bundle, null);
- }
-
- public static ExecutorUpdate fromThrowable(Throwable t) {
- return new ExecutorUpdate(null, t);
- }
-
- private ExecutorUpdate(CommittedBundle<?> producedBundle, Throwable throwable) {
- this.bundle = Optional.fromNullable(producedBundle);
- this.throwable = Optional.fromNullable(throwable);
- }
-
- public Optional<? extends CommittedBundle<?>> getBundle() {
- return bundle;
- }
-
- public Optional<? extends Throwable> getException() {
- return throwable;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(ExecutorUpdate.class)
- .add("bundle", bundle)
- .add("exception", throwable)
- .toString();
- }
- }
-
- /**
- * An update of interest to the user. Used in {@link #awaitCompletion} to decide whether to
- * return normally or throw an exception.
- */
- private static class VisibleExecutorUpdate {
- private final Optional<? extends Throwable> throwable;
- private final boolean done;
-
- public static VisibleExecutorUpdate fromThrowable(Throwable e) {
- return new VisibleExecutorUpdate(false, e);
- }
-
- public static VisibleExecutorUpdate finished() {
- return new VisibleExecutorUpdate(true, null);
- }
-
- private VisibleExecutorUpdate(boolean done, @Nullable Throwable exception) {
- this.throwable = Optional.fromNullable(exception);
- this.done = done;
- }
-
- public boolean isDone() {
- return done;
- }
- }
-
- private class MonitorRunnable implements Runnable {
- private final String runnableName =
- String.format(
- "%s$%s-monitor",
- evaluationContext.getPipelineOptions().getAppName(),
- ExecutorServiceParallelExecutor.class.getSimpleName());
-
- @Override
- public void run() {
- String oldName = Thread.currentThread().getName();
- Thread.currentThread().setName(runnableName);
- try {
- ExecutorUpdate update = allUpdates.poll();
- if (update != null) {
- LOG.debug("Executor Update: {}", update);
- if (update.getBundle().isPresent()) {
- scheduleConsumers(update.getBundle().get());
- } else if (update.getException().isPresent()) {
- visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get()));
- }
- }
- boolean timersFired = fireTimers();
- addWorkIfNecessary(timersFired);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOG.error("Monitor died due to being interrupted");
- while (!visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(e))) {
- visibleUpdates.poll();
- }
- } catch (Throwable t) {
- LOG.error("Monitor thread died due to throwable", t);
- while (!visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(t))) {
- visibleUpdates.poll();
- }
- } finally {
- if (!shouldShutdown()) {
- // The monitor thread should always be scheduled; but we only need to be scheduled once
- executorService.submit(this);
- }
- Thread.currentThread().setName(oldName);
- }
- }
-
- /**
- * Fires any available timers. Returns true if at least one timer was fired.
- */
- private boolean fireTimers() throws Exception {
- try {
- boolean firedTimers = false;
- for (Map.Entry<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> transformTimers :
- evaluationContext.extractFiredTimers().entrySet()) {
- AppliedPTransform<?, ?, ?> transform = transformTimers.getKey();
- for (Map.Entry<Object, FiredTimers> keyTimers : transformTimers.getValue().entrySet()) {
- for (TimeDomain domain : TimeDomain.values()) {
- Collection<TimerData> delivery = keyTimers.getValue().getTimers(domain);
- if (delivery.isEmpty()) {
- continue;
- }
- KeyedWorkItem<Object, Object> work =
- KeyedWorkItems.timersWorkItem(keyTimers.getKey(), delivery);
- @SuppressWarnings({"unchecked", "rawtypes"})
- CommittedBundle<?> bundle =
- InProcessBundle.<KeyedWorkItem<Object, Object>>keyed(
- (PCollection) transform.getInput(), keyTimers.getKey())
- .add(WindowedValue.valueInEmptyWindows(work))
- .commit(Instant.now());
- scheduleConsumption(transform, bundle, new TimerCompletionCallback(delivery));
- firedTimers = true;
- }
- }
- }
- return firedTimers;
- } catch (Exception e) {
- LOG.error("Internal Error while delivering timers", e);
- throw e;
- }
- }
-
- private boolean shouldShutdown() {
- if (evaluationContext.isDone()) {
- LOG.debug("Pipeline is finished. Shutting down. {}");
- while (!visibleUpdates.offer(VisibleExecutorUpdate.finished())) {
- visibleUpdates.poll();
- }
- executorService.shutdown();
- return true;
- }
- return false;
- }
-
- /**
- * If all active {@link TransformExecutor TransformExecutors} are in a blocked state,
- * add more work from root nodes that may have additional work. This ensures that if a pipeline
- * has elements available from the root nodes it will add those elements when necessary.
- */
- private void addWorkIfNecessary(boolean firedTimers) {
- // If any timers have fired, they will add more work; We don't need to add more
- if (firedTimers) {
- return;
- }
- for (TransformExecutor<?> executor : scheduledExecutors.keySet()) {
- if (!isExecutorBlocked(executor)) {
- // We have at least one executor that can proceed without adding additional work
- return;
- }
- }
- // All current TransformExecutors are blocked; add more work from the roots.
- for (AppliedPTransform<?, ?, ?> root : rootNodes) {
- if (!evaluationContext.isDone(root)) {
- scheduleConsumption(root, null, defaultCompletionCallback);
- }
- }
- }
-
- /**
- * Return true if the provided executor might make more progress if no action is taken.
- *
- * <p>May return false even if all executor threads are currently blocked or cleaning up, as
- * these can cause more work to be scheduled. If this does not occur, after these calls
- * terminate, future calls will return true if all executors are waiting.
- */
- private boolean isExecutorBlocked(TransformExecutor<?> executor) {
- Thread thread = executor.getThread();
- if (thread == null) {
- return false;
- }
- switch (thread.getState()) {
- case TERMINATED:
- throw new IllegalStateException(String.format(
- "Unexpectedly encountered a Terminated TransformExecutor %s", executor));
- case WAITING:
- case TIMED_WAITING:
- // The thread is waiting for some external input. Adding more work may cause the thread
- // to stop waiting (e.g. the thread is waiting on an unbounded side input)
- return true;
- case BLOCKED:
- // The executor is blocked on acquisition of a java monitor. This usually means it is
- // making a call to the EvaluationContext, but not a model-blocking call - and will
- // eventually complete, at which point we may reevaluate.
- default:
- // NEW and RUNNABLE threads can make progress
- return false;
- }
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java
deleted file mode 100644
index ce315be..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.Flatten;
-import com.google.cloud.dataflow.sdk.transforms.Flatten.FlattenPCollectionList;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
-
-/**
- * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the {@link Flatten}
- * {@link PTransform}.
- */
-class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
- @Override
- public <InputT> TransformEvaluator<InputT> forApplication(
- AppliedPTransform<?, ?, ?> application,
- CommittedBundle<?> inputBundle,
- InProcessEvaluationContext evaluationContext) {
- @SuppressWarnings({"cast", "unchecked", "rawtypes"})
- TransformEvaluator<InputT> evaluator = (TransformEvaluator<InputT>) createInMemoryEvaluator(
- (AppliedPTransform) application, inputBundle, evaluationContext);
- return evaluator;
- }
-
- private <InputT> TransformEvaluator<InputT> createInMemoryEvaluator(
- final AppliedPTransform<
- PCollectionList<InputT>, PCollection<InputT>, FlattenPCollectionList<InputT>>
- application,
- final CommittedBundle<InputT> inputBundle,
- final InProcessEvaluationContext evaluationContext) {
- if (inputBundle == null) {
- // it is impossible to call processElement on a flatten with no input bundle. A Flatten with
- // no input bundle occurs as an output of Flatten.pcollections(PCollectionList.empty())
- return new FlattenEvaluator<>(
- null, StepTransformResult.withoutHold(application).build());
- }
- final UncommittedBundle<InputT> outputBundle =
- evaluationContext.createBundle(inputBundle, application.getOutput());
- final InProcessTransformResult result =
- StepTransformResult.withoutHold(application).addOutput(outputBundle).build();
- return new FlattenEvaluator<>(outputBundle, result);
- }
-
- private static class FlattenEvaluator<InputT> implements TransformEvaluator<InputT> {
- private final UncommittedBundle<InputT> outputBundle;
- private final InProcessTransformResult result;
-
- public FlattenEvaluator(
- UncommittedBundle<InputT> outputBundle, InProcessTransformResult result) {
- this.outputBundle = outputBundle;
- this.result = result;
- }
-
- @Override
- public void processElement(WindowedValue<InputT> element) {
- outputBundle.add(element);
- }
-
- @Override
- public InProcessTransformResult finishBundle() {
- return result;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ForwardingPTransform.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ForwardingPTransform.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ForwardingPTransform.java
deleted file mode 100644
index b736e35..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ForwardingPTransform.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-import com.google.cloud.dataflow.sdk.values.TypedPValue;
-
-/**
- * A base class for implementing {@link PTransform} overrides, which behave identically to the
- * delegate transform but with overridden methods. Implementors are required to implement
- * {@link #delegate()}, which returns the object to forward calls to, and {@link #apply(PInput)}.
- */
-public abstract class ForwardingPTransform<InputT extends PInput, OutputT extends POutput>
- extends PTransform<InputT, OutputT> {
- protected abstract PTransform<InputT, OutputT> delegate();
-
- @Override
- public OutputT apply(InputT input) {
- return delegate().apply(input);
- }
-
- @Override
- public void validate(InputT input) {
- delegate().validate(input);
- }
-
- @Override
- public String getName() {
- return delegate().getName();
- }
-
- @Override
- public <T> Coder<T> getDefaultOutputCoder(InputT input, @SuppressWarnings("unused")
- TypedPValue<T> output) throws CannotProvideCoderException {
- return delegate().getDefaultOutputCoder(input, output);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
deleted file mode 100644
index 3ec4af1..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import static com.google.cloud.dataflow.sdk.util.CoderUtils.encodeToByteArray;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.coders.IterableCoder;
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.StepTransformResult.Builder;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey.ReifyTimestampsAndWindows;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowViaWindowSetDoFn;
-import com.google.cloud.dataflow.sdk.util.KeyedWorkItem;
-import com.google.cloud.dataflow.sdk.util.KeyedWorkItemCoder;
-import com.google.cloud.dataflow.sdk.util.KeyedWorkItems;
-import com.google.cloud.dataflow.sdk.util.SystemReduceFn;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.annotations.VisibleForTesting;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the {@link GroupByKey}
- * {@link PTransform}.
- */
-class GroupByKeyEvaluatorFactory implements TransformEvaluatorFactory {
- @Override
- public <InputT> TransformEvaluator<InputT> forApplication(
- AppliedPTransform<?, ?, ?> application,
- CommittedBundle<?> inputBundle,
- InProcessEvaluationContext evaluationContext) {
- @SuppressWarnings({"cast", "unchecked", "rawtypes"})
- TransformEvaluator<InputT> evaluator = createEvaluator(
- (AppliedPTransform) application, (CommittedBundle) inputBundle, evaluationContext);
- return evaluator;
- }
-
- private <K, V> TransformEvaluator<KV<K, WindowedValue<V>>> createEvaluator(
- final AppliedPTransform<
- PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>,
- InProcessGroupByKeyOnly<K, V>>
- application,
- final CommittedBundle<KV<K, V>> inputBundle,
- final InProcessEvaluationContext evaluationContext) {
- return new GroupByKeyEvaluator<K, V>(evaluationContext, inputBundle, application);
- }
-
- private static class GroupByKeyEvaluator<K, V>
- implements TransformEvaluator<KV<K, WindowedValue<V>>> {
- private final InProcessEvaluationContext evaluationContext;
-
- private final CommittedBundle<KV<K, V>> inputBundle;
- private final AppliedPTransform<
- PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>,
- InProcessGroupByKeyOnly<K, V>>
- application;
- private final Coder<K> keyCoder;
- private Map<GroupingKey<K>, List<WindowedValue<V>>> groupingMap;
-
- public GroupByKeyEvaluator(
- InProcessEvaluationContext evaluationContext,
- CommittedBundle<KV<K, V>> inputBundle,
- AppliedPTransform<
- PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>,
- InProcessGroupByKeyOnly<K, V>>
- application) {
- this.evaluationContext = evaluationContext;
- this.inputBundle = inputBundle;
- this.application = application;
-
- PCollection<KV<K, WindowedValue<V>>> input = application.getInput();
- keyCoder = getKeyCoder(input.getCoder());
- groupingMap = new HashMap<>();
- }
-
- private Coder<K> getKeyCoder(Coder<KV<K, WindowedValue<V>>> coder) {
- if (!(coder instanceof KvCoder)) {
- throw new IllegalStateException();
- }
- @SuppressWarnings("unchecked")
- Coder<K> keyCoder = ((KvCoder<K, WindowedValue<V>>) coder).getKeyCoder();
- return keyCoder;
- }
-
- @Override
- public void processElement(WindowedValue<KV<K, WindowedValue<V>>> element) {
- KV<K, WindowedValue<V>> kv = element.getValue();
- K key = kv.getKey();
- byte[] encodedKey;
- try {
- encodedKey = encodeToByteArray(keyCoder, key);
- } catch (CoderException exn) {
- // TODO: Put in better element printing:
- // truncate if too long.
- throw new IllegalArgumentException(
- String.format("unable to encode key %s of input to %s using %s", key, this, keyCoder),
- exn);
- }
- GroupingKey<K> groupingKey = new GroupingKey<>(key, encodedKey);
- List<WindowedValue<V>> values = groupingMap.get(groupingKey);
- if (values == null) {
- values = new ArrayList<WindowedValue<V>>();
- groupingMap.put(groupingKey, values);
- }
- values.add(kv.getValue());
- }
-
- @Override
- public InProcessTransformResult finishBundle() {
- Builder resultBuilder = StepTransformResult.withoutHold(application);
- for (Map.Entry<GroupingKey<K>, List<WindowedValue<V>>> groupedEntry :
- groupingMap.entrySet()) {
- K key = groupedEntry.getKey().key;
- KeyedWorkItem<K, V> groupedKv =
- KeyedWorkItems.elementsWorkItem(key, groupedEntry.getValue());
- UncommittedBundle<KeyedWorkItem<K, V>> bundle =
- evaluationContext.createKeyedBundle(inputBundle, key, application.getOutput());
- bundle.add(WindowedValue.valueInEmptyWindows(groupedKv));
- resultBuilder.addOutput(bundle);
- }
- return resultBuilder.build();
- }
-
- private static class GroupingKey<K> {
- private K key;
- private byte[] encodedKey;
-
- public GroupingKey(K key, byte[] encodedKey) {
- this.key = key;
- this.encodedKey = encodedKey;
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof GroupingKey) {
- GroupingKey<?> that = (GroupingKey<?>) o;
- return Arrays.equals(this.encodedKey, that.encodedKey);
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return Arrays.hashCode(encodedKey);
- }
- }
- }
-
- /**
- * An in-memory implementation of the {@link GroupByKey} primitive as a composite
- * {@link PTransform}.
- */
- public static final class InProcessGroupByKey<K, V>
- extends ForwardingPTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
- private final GroupByKey<K, V> original;
-
- private InProcessGroupByKey(GroupByKey<K, V> from) {
- this.original = from;
- }
-
- @Override
- public PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> delegate() {
- return original;
- }
-
- @Override
- public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
- KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder();
-
- // This operation groups by the combination of key and window,
- // merging windows as needed, using the windows assigned to the
- // key/value input elements and the window merge operation of the
- // window function associated with the input PCollection.
- WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
-
- // Use the default GroupAlsoByWindow implementation
- DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> groupAlsoByWindow =
- groupAlsoByWindow(windowingStrategy, inputCoder.getValueCoder());
-
- // By default, implement GroupByKey via a series of lower-level operations.
- return input
- // Make each input element's timestamp and assigned windows
- // explicit, in the value part.
- .apply(new ReifyTimestampsAndWindows<K, V>())
-
- .apply(new InProcessGroupByKeyOnly<K, V>())
- .setCoder(KeyedWorkItemCoder.of(inputCoder.getKeyCoder(),
- inputCoder.getValueCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()))
-
- // Group each key's values by window, merging windows as needed.
- .apply("GroupAlsoByWindow", ParDo.of(groupAlsoByWindow))
-
- // And update the windowing strategy as appropriate.
- .setWindowingStrategyInternal(original.updateWindowingStrategy(windowingStrategy))
- .setCoder(
- KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder())));
- }
-
- private <W extends BoundedWindow>
- DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> groupAlsoByWindow(
- final WindowingStrategy<?, W> windowingStrategy, final Coder<V> inputCoder) {
- return GroupAlsoByWindowViaWindowSetDoFn.create(
- windowingStrategy, SystemReduceFn.<K, V, W>buffering(inputCoder));
- }
- }
-
- /**
- * An implementation primitive to use in the evaluation of a {@link GroupByKey}
- * {@link PTransform}.
- */
- public static final class InProcessGroupByKeyOnly<K, V>
- extends PTransform<PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>> {
- @Override
- public PCollection<KeyedWorkItem<K, V>> apply(PCollection<KV<K, WindowedValue<V>>> input) {
- return PCollection.<KeyedWorkItem<K, V>>createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
- }
-
- @VisibleForTesting
- InProcessGroupByKeyOnly() {}
- }
-}