You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:28 UTC
[04/67] [partial] incubator-beam git commit: Directory reorganization
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateContexts.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateContexts.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateContexts.java
deleted file mode 100644
index e301d43..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateContexts.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.util.WindowingInternals;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-
-import javax.annotation.Nullable;
-
-/**
- * Factory that produces {@link StateContext} based on different inputs.
- */
-public class StateContexts {
- private static final StateContext<BoundedWindow> NULL_CONTEXT =
- new StateContext<BoundedWindow>() {
- @Override
- public PipelineOptions getPipelineOptions() {
- throw new IllegalArgumentException("cannot call getPipelineOptions() in a null context");
- }
-
- @Override
- public <T> T sideInput(PCollectionView<T> view) {
- throw new IllegalArgumentException("cannot call sideInput() in a null context");
- }
-
- @Override
- public BoundedWindow window() {
- throw new IllegalArgumentException("cannot call window() in a null context");
- }};
-
- /**
- * Returns a fake {@link StateContext}.
- */
- @SuppressWarnings("unchecked")
- public static <W extends BoundedWindow> StateContext<W> nullContext() {
- return (StateContext<W>) NULL_CONTEXT;
- }
-
- /**
- * Returns a {@link StateContext} that only contains the state window.
- */
- public static <W extends BoundedWindow> StateContext<W> windowOnly(final W window) {
- return new StateContext<W>() {
- @Override
- public PipelineOptions getPipelineOptions() {
- throw new IllegalArgumentException(
- "cannot call getPipelineOptions() in a window only context");
- }
- @Override
- public <T> T sideInput(PCollectionView<T> view) {
- throw new IllegalArgumentException("cannot call sideInput() in a window only context");
- }
- @Override
- public W window() {
- return window;
- }
- };
- }
-
- /**
- * Returns a {@link StateContext} from {@code PipelineOptions}, {@link WindowingInternals},
- * and the state window.
- */
- public static <W extends BoundedWindow> StateContext<W> createFromComponents(
- @Nullable final PipelineOptions options,
- final WindowingInternals<?, ?> windowingInternals,
- final W window) {
- @SuppressWarnings("unchecked")
- StateContext<W> typedNullContext = (StateContext<W>) NULL_CONTEXT;
- if (options == null) {
- return typedNullContext;
- } else {
- return new StateContext<W>() {
-
- @Override
- public PipelineOptions getPipelineOptions() {
- return options;
- }
-
- @Override
- public <T> T sideInput(PCollectionView<T> view) {
- return windowingInternals.sideInput(view, window);
- }
-
- @Override
- public W window() {
- return window;
- }
- };
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateInternals.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateInternals.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateInternals.java
deleted file mode 100644
index b31afb4..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateInternals.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-
-/**
- * {@code StateInternals} describes the functionality a runner needs to provide for the
- * State API to be supported.
- *
- * <p>The SDK will only use this after elements have been partitioned by key. For instance, after a
- * {@link GroupByKey} operation. The runner implementation must ensure that any writes using
- * {@link StateInternals} are implicitly scoped to the key being processed and the specific step
- * accessing state.
- *
- * <p>The runner implementation must also ensure that any writes to the associated state objects
- * are persisted together with the completion status of the processing that produced these
- * writes.
- *
- * <p>This is a low-level API intended for use by the Dataflow SDK. It should not be
- * used directly, and is highly likely to change.
- */
-@Experimental(Kind.STATE)
-public interface StateInternals<K> {
-
- /** The key for this {@link StateInternals}. */
- K getKey();
-
- /**
- * Return the state associated with {@code address} in the specified {@code namespace}.
- */
- <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address);
-
- /**
- * Return the state associated with {@code address} in the specified {@code namespace}
- * with the {@link StateContext}.
- */
- <T extends State> T state(
- StateNamespace namespace, StateTag<? super K, T> address, StateContext<?> c);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateMerging.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateMerging.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateMerging.java
deleted file mode 100644
index 0b33ea9..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateMerging.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.common.base.Preconditions;
-
-import org.joda.time.Instant;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Helpers for merging state.
- */
-public class StateMerging {
- /**
- * Clear all state in {@code address} in all windows under merge (even result windows)
- * in {@code context}.
- */
- public static <K, StateT extends State, W extends BoundedWindow> void clear(
- MergingStateAccessor<K, W> context, StateTag<? super K, StateT> address) {
- for (StateT state : context.accessInEachMergingWindow(address).values()) {
- state.clear();
- }
- }
-
- /**
- * Prefetch all bag state in {@code address} across all windows under merge in
- * {@code context}, except for the bag state in the final state address window which we can
- * blindly append to.
- */
- public static <K, T, W extends BoundedWindow> void prefetchBags(
- MergingStateAccessor<K, W> context, StateTag<? super K, BagState<T>> address) {
- Map<W, BagState<T>> map = context.accessInEachMergingWindow(address);
- if (map.isEmpty()) {
- // Nothing to prefetch.
- return;
- }
- BagState<T> result = context.access(address);
- // Prefetch everything except what's already in result.
- for (BagState<T> source : map.values()) {
- if (!source.equals(result)) {
- source.readLater();
- }
- }
- }
-
- /**
- * Merge all bag state in {@code address} across all windows under merge.
- */
- public static <K, T, W extends BoundedWindow> void mergeBags(
- MergingStateAccessor<K, W> context, StateTag<? super K, BagState<T>> address) {
- mergeBags(context.accessInEachMergingWindow(address).values(), context.access(address));
- }
-
- /**
- * Merge all bag state in {@code sources} (which may include {@code result}) into {@code result}.
- */
- public static <T, W extends BoundedWindow> void mergeBags(
- Collection<BagState<T>> sources, BagState<T> result) {
- if (sources.isEmpty()) {
- // Nothing to merge.
- return;
- }
- // Prefetch everything except what's already in result.
- List<ReadableState<Iterable<T>>> futures = new ArrayList<>(sources.size());
- for (BagState<T> source : sources) {
- if (!source.equals(result)) {
- source.readLater();
- futures.add(source);
- }
- }
- if (futures.isEmpty()) {
- // Result already holds all the values.
- return;
- }
- // Transfer from sources to result.
- for (ReadableState<Iterable<T>> future : futures) {
- for (T element : future.read()) {
- result.add(element);
- }
- }
- // Clear sources except for result.
- for (BagState<T> source : sources) {
- if (!source.equals(result)) {
- source.clear();
- }
- }
- }
-
- /**
- * Prefetch all combining value state for {@code address} across all merging windows in {@code
- * context}.
- */
- public static <K, StateT extends CombiningState<?, ?>, W extends BoundedWindow> void
- prefetchCombiningValues(MergingStateAccessor<K, W> context,
- StateTag<? super K, StateT> address) {
- for (StateT state : context.accessInEachMergingWindow(address).values()) {
- state.readLater();
- }
- }
-
- /**
- * Merge all value state in {@code address} across all merging windows in {@code context}.
- */
- public static <K, InputT, AccumT, OutputT, W extends BoundedWindow> void mergeCombiningValues(
- MergingStateAccessor<K, W> context,
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address) {
- mergeCombiningValues(
- context.accessInEachMergingWindow(address).values(), context.access(address));
- }
-
- /**
- * Merge all value state from {@code sources} (which may include {@code result}) into
- * {@code result}.
- */
- public static <InputT, AccumT, OutputT, W extends BoundedWindow> void mergeCombiningValues(
- Collection<AccumulatorCombiningState<InputT, AccumT, OutputT>> sources,
- AccumulatorCombiningState<InputT, AccumT, OutputT> result) {
- if (sources.isEmpty()) {
- // Nothing to merge.
- return;
- }
- if (sources.size() == 1 && sources.contains(result)) {
- // Result already holds combined value.
- return;
- }
- // Prefetch.
- List<ReadableState<AccumT>> futures = new ArrayList<>(sources.size());
- for (AccumulatorCombiningState<InputT, AccumT, OutputT> source : sources) {
- source.readLater();
- }
- // Read.
- List<AccumT> accumulators = new ArrayList<>(futures.size());
- for (AccumulatorCombiningState<InputT, AccumT, OutputT> source : sources) {
- accumulators.add(source.getAccum());
- }
- // Merge (possibly update and return one of the existing accumulators).
- AccumT merged = result.mergeAccumulators(accumulators);
- // Clear sources.
- for (AccumulatorCombiningState<InputT, AccumT, OutputT> source : sources) {
- source.clear();
- }
- // Update result.
- result.addAccum(merged);
- }
-
- /**
- * Prefetch all watermark state for {@code address} across all merging windows in
- * {@code context}.
- */
- public static <K, W extends BoundedWindow> void prefetchWatermarks(
- MergingStateAccessor<K, W> context,
- StateTag<? super K, WatermarkHoldState<W>> address) {
- Map<W, WatermarkHoldState<W>> map = context.accessInEachMergingWindow(address);
- WatermarkHoldState<W> result = context.access(address);
- if (map.isEmpty()) {
- // Nothing to prefetch.
- return;
- }
- if (map.size() == 1 && map.values().contains(result)
- && result.getOutputTimeFn().dependsOnlyOnEarliestInputTimestamp()) {
- // Nothing to change.
- return;
- }
- if (result.getOutputTimeFn().dependsOnlyOnWindow()) {
- // No need to read existing holds.
- return;
- }
- // Prefetch.
- for (WatermarkHoldState<W> source : map.values()) {
- source.readLater();
- }
- }
-
- /**
- * Merge all watermark state in {@code address} across all merging windows in {@code context},
- * where the final merge result window is {@code mergeResult}.
- */
- public static <K, W extends BoundedWindow> void mergeWatermarks(
- MergingStateAccessor<K, W> context,
- StateTag<? super K, WatermarkHoldState<W>> address,
- W mergeResult) {
- mergeWatermarks(
- context.accessInEachMergingWindow(address).values(), context.access(address), mergeResult);
- }
-
- /**
- * Merge all watermark state in {@code sources} (which must include {@code result} if non-empty)
- * into {@code result}, where the final merge result window is {@code mergeResult}.
- */
- public static <W extends BoundedWindow> void mergeWatermarks(
- Collection<WatermarkHoldState<W>> sources, WatermarkHoldState<W> result,
- W resultWindow) {
- if (sources.isEmpty()) {
- // Nothing to merge.
- return;
- }
- if (sources.size() == 1 && sources.contains(result)
- && result.getOutputTimeFn().dependsOnlyOnEarliestInputTimestamp()) {
- // Nothing to merge.
- return;
- }
- if (result.getOutputTimeFn().dependsOnlyOnWindow()) {
- // Clear sources.
- for (WatermarkHoldState<W> source : sources) {
- source.clear();
- }
- // Update directly from window-derived hold.
- Instant hold = result.getOutputTimeFn().assignOutputTime(
- BoundedWindow.TIMESTAMP_MIN_VALUE, resultWindow);
- Preconditions.checkState(hold.isAfter(BoundedWindow.TIMESTAMP_MIN_VALUE));
- result.add(hold);
- } else {
- // Prefetch.
- List<ReadableState<Instant>> futures = new ArrayList<>(sources.size());
- for (WatermarkHoldState<W> source : sources) {
- futures.add(source);
- }
- // Read.
- List<Instant> outputTimesToMerge = new ArrayList<>(sources.size());
- for (ReadableState<Instant> future : futures) {
- Instant sourceOutputTime = future.read();
- if (sourceOutputTime != null) {
- outputTimesToMerge.add(sourceOutputTime);
- }
- }
- // Clear sources.
- for (WatermarkHoldState<W> source : sources) {
- source.clear();
- }
- if (!outputTimesToMerge.isEmpty()) {
- // Merge and update.
- result.add(result.getOutputTimeFn().merge(resultWindow, outputTimesToMerge));
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateNamespace.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateNamespace.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateNamespace.java
deleted file mode 100644
index f972e31..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateNamespace.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-import java.io.IOException;
-
-/**
- * A namespace used for scoping state stored with {@link StateInternals}.
- *
- * <p>Instances of {@code StateNamespace} are guaranteed to have a {@link #hashCode} and
- * {@link #equals} that uniquely identify the namespace.
- */
-public interface StateNamespace {
-
- /**
- * Return a {@link String} representation of the key. It is guaranteed that this
- * {@code String} will uniquely identify the key.
- *
- * <p>This will encode the actual namespace as a {@code String}. It is
- * preferable to use the {@code StateNamespace} object when possible.
- *
- * <p>The string produced by the standard implementations will not contain a '+' character. This
- * enables adding a '+' between the actual namespace and other information, if needed, to separate
- * the two.
- */
- String stringKey();
-
- /**
- * Append the string representation of this key to the {@link Appendable}.
- */
- void appendTo(Appendable sb) throws IOException;
-
- /**
- * Return an {@code Object} to use as a key in a cache.
- *
- * <p>Different namespaces may use the same key in order to be treated as a unit in the cache.
- * The {@code Object}'s {@code hashCode} and {@code equals} methods will be used to determine
- * equality.
- */
- Object getCacheKey();
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateNamespaceForTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateNamespaceForTest.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateNamespaceForTest.java
deleted file mode 100644
index 09b86d6..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateNamespaceForTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-import java.io.IOException;
-import java.util.Objects;
-
-/**
- * A simple {@link StateNamespace} used for testing.
- */
-public class StateNamespaceForTest implements StateNamespace {
- private String key;
-
- public StateNamespaceForTest(String key) {
- this.key = key;
- }
-
- @Override
- public String stringKey() {
- return key;
- }
-
- @Override
- public Object getCacheKey() {
- return key;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
-
- if (!(obj instanceof StateNamespaceForTest)) {
- return false;
- }
-
- return Objects.equals(this.key, ((StateNamespaceForTest) obj).key);
- }
-
- @Override
- public int hashCode() {
- return key.hashCode();
- }
-
- @Override
- public void appendTo(Appendable sb) throws IOException {
- sb.append(key);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateNamespaces.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateNamespaces.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateNamespaces.java
deleted file mode 100644
index 8fee995..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateNamespaces.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.util.CoderUtils;
-import com.google.common.base.Splitter;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Objects;
-
-/**
- * Factory methods for creating the {@link StateNamespace StateNamespaces}.
- */
-public class StateNamespaces {
-
- private enum Namespace {
- GLOBAL,
- WINDOW,
- WINDOW_AND_TRIGGER;
- }
-
- public static StateNamespace global() {
- return new GlobalNamespace();
- }
-
- public static <W extends BoundedWindow> StateNamespace window(Coder<W> windowCoder, W window) {
- return new WindowNamespace<>(windowCoder, window);
- }
-
- public static <W extends BoundedWindow>
- StateNamespace windowAndTrigger(Coder<W> windowCoder, W window, int triggerIdx) {
- return new WindowAndTriggerNamespace<>(windowCoder, window, triggerIdx);
- }
-
- private StateNamespaces() {}
-
- /**
- * {@link StateNamespace} that is global to the current key being processed.
- */
- public static class GlobalNamespace implements StateNamespace {
-
- private static final String GLOBAL_STRING = "/";
-
- @Override
- public String stringKey() {
- return GLOBAL_STRING;
- }
-
- @Override
- public Object getCacheKey() {
- return GLOBAL_STRING;
- }
-
- @Override
- public boolean equals(Object obj) {
- return obj == this || obj instanceof GlobalNamespace;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(Namespace.GLOBAL);
- }
-
- @Override
- public String toString() {
- return "Global";
- }
-
- @Override
- public void appendTo(Appendable sb) throws IOException {
- sb.append(GLOBAL_STRING);
- }
- }
-
- /**
- * {@link StateNamespace} that is scoped to a specific window.
- */
- public static class WindowNamespace<W extends BoundedWindow> implements StateNamespace {
-
- private static final String WINDOW_FORMAT = "/%s/";
-
- private Coder<W> windowCoder;
- private W window;
-
- private WindowNamespace(Coder<W> windowCoder, W window) {
- this.windowCoder = windowCoder;
- this.window = window;
- }
-
- public W getWindow() {
- return window;
- }
-
- @Override
- public String stringKey() {
- try {
- return String.format(WINDOW_FORMAT, CoderUtils.encodeToBase64(windowCoder, window));
- } catch (CoderException e) {
- throw new RuntimeException("Unable to generate string key from window " + window, e);
- }
- }
-
- @Override
- public void appendTo(Appendable sb) throws IOException {
- sb.append('/').append(CoderUtils.encodeToBase64(windowCoder, window)).append('/');
- }
-
- /**
- * State in the same window will all be evicted together.
- */
- @Override
- public Object getCacheKey() {
- return window;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
-
- if (!(obj instanceof WindowNamespace)) {
- return false;
- }
-
- WindowNamespace<?> that = (WindowNamespace<?>) obj;
- return Objects.equals(this.window, that.window);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(Namespace.WINDOW, window);
- }
-
- @Override
- public String toString() {
- return "Window(" + window + ")";
- }
- }
-
- /**
- * {@link StateNamespace} that is scoped to a particular window and trigger index.
- */
- public static class WindowAndTriggerNamespace<W extends BoundedWindow>
- implements StateNamespace {
-
- private static final String WINDOW_AND_TRIGGER_FORMAT = "/%s/%s/";
-
- private static final int TRIGGER_RADIX = 36;
- private Coder<W> windowCoder;
- private W window;
- private int triggerIndex;
-
- private WindowAndTriggerNamespace(Coder<W> windowCoder, W window, int triggerIndex) {
- this.windowCoder = windowCoder;
- this.window = window;
- this.triggerIndex = triggerIndex;
- }
-
- public W getWindow() {
- return window;
- }
-
- public int getTriggerIndex() {
- return triggerIndex;
- }
-
- @Override
- public String stringKey() {
- try {
- return String.format(WINDOW_AND_TRIGGER_FORMAT,
- CoderUtils.encodeToBase64(windowCoder, window),
- // Use base 36 so that can address 36 triggers in a single byte and still be human
- // readable.
- Integer.toString(triggerIndex, TRIGGER_RADIX).toUpperCase());
- } catch (CoderException e) {
- throw new RuntimeException("Unable to generate string key from window " + window, e);
- }
- }
-
- @Override
- public void appendTo(Appendable sb) throws IOException {
- sb.append('/').append(CoderUtils.encodeToBase64(windowCoder, window));
- sb.append('/').append(Integer.toString(triggerIndex, TRIGGER_RADIX).toUpperCase());
- sb.append('/');
- }
-
- /**
- * State in the same window will all be evicted together.
- */
- @Override
- public Object getCacheKey() {
- return window;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
-
- if (!(obj instanceof WindowAndTriggerNamespace)) {
- return false;
- }
-
- WindowAndTriggerNamespace<?> that = (WindowAndTriggerNamespace<?>) obj;
- return this.triggerIndex == that.triggerIndex
- && Objects.equals(this.window, that.window);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(Namespace.WINDOW_AND_TRIGGER, window, triggerIndex);
- }
-
- @Override
- public String toString() {
- return "WindowAndTrigger(" + window + "," + triggerIndex + ")";
- }
- }
-
- private static final Splitter SLASH_SPLITTER = Splitter.on('/');
-
- /**
- * Convert a {@code stringKey} produced using {@link StateNamespace#stringKey}
- * on one of the namespaces produced by this class into the original
- * {@link StateNamespace}.
- */
- public static <W extends BoundedWindow> StateNamespace fromString(
- String stringKey, Coder<W> windowCoder) {
- if (!stringKey.startsWith("/") || !stringKey.endsWith("/")) {
- throw new RuntimeException("Invalid namespace string: '" + stringKey + "'");
- }
-
- if (GlobalNamespace.GLOBAL_STRING.equals(stringKey)) {
- return global();
- }
-
- List<String> parts = SLASH_SPLITTER.splitToList(stringKey);
- if (parts.size() != 3 && parts.size() != 4) {
- throw new RuntimeException("Invalid namespace string: '" + stringKey + "'");
- }
- // Ends should be empty (we start and end with /)
- if (!parts.get(0).isEmpty() || !parts.get(parts.size() - 1).isEmpty()) {
- throw new RuntimeException("Invalid namespace string: '" + stringKey + "'");
- }
-
- try {
- W window = CoderUtils.decodeFromBase64(windowCoder, parts.get(1));
- if (parts.size() > 3) {
- int index = Integer.parseInt(parts.get(2), WindowAndTriggerNamespace.TRIGGER_RADIX);
- return windowAndTrigger(windowCoder, window, index);
- } else {
- return window(windowCoder, window);
- }
- } catch (Exception e) {
- throw new RuntimeException("Invalid namespace string: '" + stringKey + "'", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateTable.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateTable.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateTable.java
deleted file mode 100644
index edd1dae..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateTable.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-import com.google.cloud.dataflow.sdk.util.state.StateTag.StateBinder;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Table;
-import com.google.common.collect.Tables;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Table mapping {@code StateNamespace} and {@code StateTag<?>} to a {@code State} instance.
- */
-public abstract class StateTable<K> {
-
- private final Table<StateNamespace, StateTag<? super K, ?>, State> stateTable =
- Tables.newCustomTable(new HashMap<StateNamespace, Map<StateTag<? super K, ?>, State>>(),
- new Supplier<Map<StateTag<? super K, ?>, State>>() {
- @Override
- public Map<StateTag<? super K, ?>, State> get() {
- return new HashMap<>();
- }
- });
-
- /**
- * Gets the {@link State} in the specified {@link StateNamespace} with the specified {@link
- * StateTag}, binding it using the {@link #binderForNamespace} if it is not
- * already present in this {@link StateTable}.
- */
- public <StateT extends State> StateT get(
- StateNamespace namespace, StateTag<? super K, StateT> tag, StateContext<?> c) {
- State storage = stateTable.get(namespace, tag);
- if (storage != null) {
- @SuppressWarnings("unchecked")
- StateT typedStorage = (StateT) storage;
- return typedStorage;
- }
-
- StateT typedStorage = tag.bind(binderForNamespace(namespace, c));
- stateTable.put(namespace, tag, typedStorage);
- return typedStorage;
- }
-
- public void clearNamespace(StateNamespace namespace) {
- stateTable.rowKeySet().remove(namespace);
- }
-
- public void clear() {
- stateTable.clear();
- }
-
- public Iterable<State> values() {
- return stateTable.values();
- }
-
- public boolean isNamespaceInUse(StateNamespace namespace) {
- return stateTable.containsRow(namespace);
- }
-
- public Map<StateTag<? super K, ?>, State> getTagsInUse(StateNamespace namespace) {
- return stateTable.row(namespace);
- }
-
- public Set<StateNamespace> getNamespacesInUse() {
- return stateTable.rowKeySet();
- }
-
- /**
- * Provide the {@code StateBinder} to use for creating {@code Storage} instances
- * in the specified {@code namespace}.
- */
- protected abstract StateBinder<K> binderForNamespace(StateNamespace namespace, StateContext<?> c);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateTag.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateTag.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateTag.java
deleted file mode 100644
index c87bdb7..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateTag.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * An address for persistent state. This includes a unique identifier for the location, the
- * information necessary to encode the value, and details about the intended access pattern.
- *
- * <p>State can be thought of as a sparse table, with each {@code StateTag} defining a column
- * that has cells of type {@code StateT}.
- *
- * <p>Currently, this can only be used in a step immediately following a {@link GroupByKey}.
- *
- * @param <K> The type of key that must be used with the state tag. Contravariant: methods should
- * accept values of type {@code KeyedStateTag<? super K, StateT>}.
- * @param <StateT> The type of state being tagged.
- */
-@Experimental(Kind.STATE)
-public interface StateTag<K, StateT extends State> extends Serializable {
-
- /**
- * Visitor for binding a {@link StateTag} and to the associated {@link State}.
- *
- * @param <K> the type of key this binder embodies.
- */
- public interface StateBinder<K> {
- <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> address, Coder<T> coder);
-
- <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> address, Coder<T> elemCoder);
-
- <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
- bindCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn);
-
- <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
- bindKeyedCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn);
-
- <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
- bindKeyedCombiningValueWithContext(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn);
-
- /**
- * Bind to a watermark {@link StateTag}.
- *
- * <p>This accepts the {@link OutputTimeFn} that dictates how watermark hold timestamps
- * added to the returned {@link WatermarkHoldState} are to be combined.
- */
- <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
- StateTag<? super K, WatermarkHoldState<W>> address,
- OutputTimeFn<? super W> outputTimeFn);
- }
-
- /** Append the UTF-8 encoding of this tag to the given {@link Appendable}. */
- void appendTo(Appendable sb) throws IOException;
-
- /**
- * Returns the user-provided name of this state cell.
- */
- String getId();
-
- /**
- * Use the {@code binder} to create an instance of {@code StateT} appropriate for this address.
- */
- StateT bind(StateBinder<? extends K> binder);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateTags.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateTags.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateTags.java
deleted file mode 100644
index ec9a78f..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateTags.java
+++ /dev/null
@@ -1,579 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind;
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderRegistry;
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn;
-import com.google.common.base.MoreObjects;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Objects;
-
-/**
- * Static utility methods for creating {@link StateTag} instances.
- */
-@Experimental(Kind.STATE)
-public class StateTags {
-
- private static final CoderRegistry STANDARD_REGISTRY = new CoderRegistry();
-
- static {
- STANDARD_REGISTRY.registerStandardCoders();
- }
-
- private enum StateKind {
- SYSTEM('s'),
- USER('u');
-
- private char prefix;
-
- StateKind(char prefix) {
- this.prefix = prefix;
- }
- }
-
- private StateTags() { }
-
- private interface SystemStateTag<K, StateT extends State> {
- StateTag<K, StateT> asKind(StateKind kind);
- }
-
- /**
- * Create a simple state tag for values of type {@code T}.
- */
- public static <T> StateTag<Object, ValueState<T>> value(String id, Coder<T> valueCoder) {
- return new ValueStateTag<>(new StructuredId(id), valueCoder);
- }
-
- /**
- * Create a state tag for values that use a {@link CombineFn} to automatically merge
- * multiple {@code InputT}s into a single {@code OutputT}.
- */
- public static <InputT, AccumT, OutputT>
- StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>
- combiningValue(
- String id, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
- return combiningValueInternal(id, accumCoder, combineFn);
- }
-
- /**
- * Create a state tag for values that use a {@link KeyedCombineFn} to automatically merge
- * multiple {@code InputT}s into a single {@code OutputT}. The key provided to the
- * {@link KeyedCombineFn} comes from the keyed {@link StateAccessor}.
- */
- public static <K, InputT, AccumT,
- OutputT> StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>>
- keyedCombiningValue(String id, Coder<AccumT> accumCoder,
- KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
- return keyedCombiningValueInternal(id, accumCoder, combineFn);
- }
-
- /**
- * Create a state tag for values that use a {@link KeyedCombineFnWithContext} to automatically
- * merge multiple {@code InputT}s into a single {@code OutputT}. The key provided to the
- * {@link KeyedCombineFn} comes from the keyed {@link StateAccessor}, the context provided comes
- * from the {@link StateContext}.
- */
- public static <K, InputT, AccumT, OutputT>
- StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>>
- keyedCombiningValueWithContext(
- String id,
- Coder<AccumT> accumCoder,
- KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
- return new KeyedCombiningValueWithContextStateTag<K, InputT, AccumT, OutputT>(
- new StructuredId(id),
- accumCoder,
- combineFn);
- }
-
- /**
- * Create a state tag for values that use a {@link CombineFn} to automatically merge
- * multiple {@code InputT}s into a single {@code OutputT}.
- *
- * <p>This determines the {@code Coder<AccumT>} from the given {@code Coder<InputT>}, and
- * should only be used to initialize static values.
- */
- public static <InputT, AccumT, OutputT>
- StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>
- combiningValueFromInputInternal(
- String id, Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
- try {
- Coder<AccumT> accumCoder = combineFn.getAccumulatorCoder(STANDARD_REGISTRY, inputCoder);
- return combiningValueInternal(id, accumCoder, combineFn);
- } catch (CannotProvideCoderException e) {
- throw new IllegalArgumentException(
- "Unable to determine accumulator coder for " + combineFn.getClass().getSimpleName()
- + " from " + inputCoder, e);
- }
- }
-
- private static <InputT, AccumT,
- OutputT> StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>
- combiningValueInternal(
- String id, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
- return
- new CombiningValueStateTag<InputT, AccumT, OutputT>(
- new StructuredId(id), accumCoder, combineFn);
- }
-
- private static <K, InputT, AccumT, OutputT>
- StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> keyedCombiningValueInternal(
- String id,
- Coder<AccumT> accumCoder,
- KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
- return new KeyedCombiningValueStateTag<K, InputT, AccumT, OutputT>(
- new StructuredId(id), accumCoder, combineFn);
- }
-
- /**
- * Create a state tag that is optimized for adding values frequently, and
- * occasionally retrieving all the values that have been added.
- */
- public static <T> StateTag<Object, BagState<T>> bag(String id, Coder<T> elemCoder) {
- return new BagStateTag<T>(new StructuredId(id), elemCoder);
- }
-
- /**
- * Create a state tag for holding the watermark.
- */
- public static <W extends BoundedWindow> StateTag<Object, WatermarkHoldState<W>>
- watermarkStateInternal(String id, OutputTimeFn<? super W> outputTimeFn) {
- return new WatermarkStateTagInternal<W>(new StructuredId(id), outputTimeFn);
- }
-
- /**
- * Convert an arbitrary {@link StateTag} to a system-internal tag that is guaranteed not to
- * collide with any user tags.
- */
- public static <K, StateT extends State> StateTag<K, StateT> makeSystemTagInternal(
- StateTag<K, StateT> tag) {
- if (!(tag instanceof SystemStateTag)) {
- throw new IllegalArgumentException("Expected subclass of StateTagBase, got " + tag);
- }
- // Checked above
- @SuppressWarnings("unchecked")
- SystemStateTag<K, StateT> typedTag = (SystemStateTag<K, StateT>) tag;
- return typedTag.asKind(StateKind.SYSTEM);
- }
-
- public static <K, InputT, AccumT, OutputT> StateTag<Object, BagState<AccumT>>
- convertToBagTagInternal(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningTag) {
- if (combiningTag instanceof KeyedCombiningValueStateTag) {
- // Checked above; conversion to a bag tag depends on the provided tag being one of those
- // created via the factory methods in this class.
- @SuppressWarnings("unchecked")
- KeyedCombiningValueStateTag<K, InputT, AccumT, OutputT> typedTag =
- (KeyedCombiningValueStateTag<K, InputT, AccumT, OutputT>) combiningTag;
- return typedTag.asBagTag();
- } else if (combiningTag instanceof KeyedCombiningValueWithContextStateTag) {
- @SuppressWarnings("unchecked")
- KeyedCombiningValueWithContextStateTag<K, InputT, AccumT, OutputT> typedTag =
- (KeyedCombiningValueWithContextStateTag<K, InputT, AccumT, OutputT>) combiningTag;
- return typedTag.asBagTag();
- } else {
- throw new IllegalArgumentException("Unexpected StateTag " + combiningTag);
- }
- }
-
- private static class StructuredId implements Serializable {
- private final StateKind kind;
- private final String rawId;
-
- private StructuredId(String rawId) {
- this(StateKind.USER, rawId);
- }
-
- private StructuredId(StateKind kind, String rawId) {
- this.kind = kind;
- this.rawId = rawId;
- }
-
- public StructuredId asKind(StateKind kind) {
- return new StructuredId(kind, rawId);
- }
-
- public void appendTo(Appendable sb) throws IOException {
- sb.append(kind.prefix).append(rawId);
- }
-
- public String getRawId() {
- return rawId;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("id", rawId)
- .add("kind", kind)
- .toString();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
-
- if (!(obj instanceof StructuredId)) {
- return false;
- }
-
- StructuredId that = (StructuredId) obj;
- return Objects.equals(this.kind, that.kind)
- && Objects.equals(this.rawId, that.rawId);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(kind, rawId);
- }
- }
-
- /**
- * A base class that just manages the structured ids.
- */
- private abstract static class StateTagBase<K, StateT extends State>
- implements StateTag<K, StateT>, SystemStateTag<K, StateT> {
-
- protected final StructuredId id;
-
- protected StateTagBase(StructuredId id) {
- this.id = id;
- }
-
- @Override
- public String getId() {
- return id.getRawId();
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("id", id)
- .toString();
- }
-
- @Override
- public void appendTo(Appendable sb) throws IOException {
- id.appendTo(sb);
- }
-
- @Override
- public abstract StateTag<K, StateT> asKind(StateKind kind);
- }
-
- /**
- * A value state cell for values of type {@code T}.
- *
- * @param <T> the type of value being stored
- */
- private static class ValueStateTag<T> extends StateTagBase<Object, ValueState<T>>
- implements StateTag<Object, ValueState<T>> {
-
- private final Coder<T> coder;
-
- private ValueStateTag(StructuredId id, Coder<T> coder) {
- super(id);
- this.coder = coder;
- }
-
- @Override
- public ValueState<T> bind(StateBinder<? extends Object> visitor) {
- return visitor.bindValue(this, coder);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
-
- if (!(obj instanceof ValueStateTag)) {
- return false;
- }
-
- ValueStateTag<?> that = (ValueStateTag<?>) obj;
- return Objects.equals(this.id, that.id)
- && Objects.equals(this.coder, that.coder);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(getClass(), id, coder);
- }
-
- @Override
- public StateTag<Object, ValueState<T>> asKind(StateKind kind) {
- return new ValueStateTag<T>(id.asKind(kind), coder);
- }
- }
-
- /**
- * A state cell for values that are combined according to a {@link CombineFn}.
- *
- * @param <InputT> the type of input values
- * @param <AccumT> type of mutable accumulator values
- * @param <OutputT> type of output values
- */
- private static class CombiningValueStateTag<InputT, AccumT, OutputT>
- extends KeyedCombiningValueStateTag<Object, InputT, AccumT, OutputT>
- implements StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>,
- SystemStateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> {
-
- private final Coder<AccumT> accumCoder;
- private final CombineFn<InputT, AccumT, OutputT> combineFn;
-
- private CombiningValueStateTag(
- StructuredId id,
- Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
- super(id, accumCoder, combineFn.asKeyedFn());
- this.combineFn = combineFn;
- this.accumCoder = accumCoder;
- }
-
- @Override
- public StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>
- asKind(StateKind kind) {
- return new CombiningValueStateTag<InputT, AccumT, OutputT>(
- id.asKind(kind), accumCoder, combineFn);
- }
- }
-
- /**
- * A state cell for values that are combined according to a {@link KeyedCombineFnWithContext}.
- *
- * @param <K> the type of keys
- * @param <InputT> the type of input values
- * @param <AccumT> type of mutable accumulator values
- * @param <OutputT> type of output values
- */
- private static class KeyedCombiningValueWithContextStateTag<K, InputT, AccumT, OutputT>
- extends StateTagBase<K, AccumulatorCombiningState<InputT, AccumT, OutputT>>
- implements SystemStateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> {
-
- private final Coder<AccumT> accumCoder;
- private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn;
-
- protected KeyedCombiningValueWithContextStateTag(
- StructuredId id,
- Coder<AccumT> accumCoder,
- KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
- super(id);
- this.combineFn = combineFn;
- this.accumCoder = accumCoder;
- }
-
- @Override
- public AccumulatorCombiningState<InputT, AccumT, OutputT> bind(
- StateBinder<? extends K> visitor) {
- return visitor.bindKeyedCombiningValueWithContext(this, accumCoder, combineFn);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
-
- if (!(obj instanceof KeyedCombiningValueWithContextStateTag)) {
- return false;
- }
-
- KeyedCombiningValueWithContextStateTag<?, ?, ?, ?> that =
- (KeyedCombiningValueWithContextStateTag<?, ?, ?, ?>) obj;
- return Objects.equals(this.id, that.id)
- && Objects.equals(this.accumCoder, that.accumCoder);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(getClass(), id, accumCoder);
- }
-
- @Override
- public StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> asKind(
- StateKind kind) {
- return new KeyedCombiningValueWithContextStateTag<>(
- id.asKind(kind), accumCoder, combineFn);
- }
-
- private StateTag<Object, BagState<AccumT>> asBagTag() {
- return new BagStateTag<AccumT>(id, accumCoder);
- }
- }
-
- /**
- * A state cell for values that are combined according to a {@link KeyedCombineFn}.
- *
- * @param <K> the type of keys
- * @param <InputT> the type of input values
- * @param <AccumT> type of mutable accumulator values
- * @param <OutputT> type of output values
- */
- private static class KeyedCombiningValueStateTag<K, InputT, AccumT, OutputT>
- extends StateTagBase<K, AccumulatorCombiningState<InputT, AccumT, OutputT>>
- implements SystemStateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> {
-
- private final Coder<AccumT> accumCoder;
- private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn;
-
- protected KeyedCombiningValueStateTag(
- StructuredId id,
- Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) {
- super(id);
- this.keyedCombineFn = keyedCombineFn;
- this.accumCoder = accumCoder;
- }
-
- @Override
- public AccumulatorCombiningState<InputT, AccumT, OutputT> bind(
- StateBinder<? extends K> visitor) {
- return visitor.bindKeyedCombiningValue(this, accumCoder, keyedCombineFn);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
-
- if (!(obj instanceof CombiningValueStateTag)) {
- return false;
- }
-
- KeyedCombiningValueStateTag<?, ?, ?, ?> that = (KeyedCombiningValueStateTag<?, ?, ?, ?>) obj;
- return Objects.equals(this.id, that.id)
- && Objects.equals(this.accumCoder, that.accumCoder);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(getClass(), id, accumCoder);
- }
-
- @Override
- public StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> asKind(
- StateKind kind) {
- return new KeyedCombiningValueStateTag<>(id.asKind(kind), accumCoder, keyedCombineFn);
- }
-
- private StateTag<Object, BagState<AccumT>> asBagTag() {
- return new BagStateTag<AccumT>(id, accumCoder);
- }
- }
-
- /**
- * A state cell optimized for bag-like access patterns (frequent additions, occasional reads
- * of all the values).
- *
- * @param <T> the type of value in the bag
- */
- private static class BagStateTag<T> extends StateTagBase<Object, BagState<T>>
- implements StateTag<Object, BagState<T>>{
-
- private final Coder<T> elemCoder;
-
- private BagStateTag(StructuredId id, Coder<T> elemCoder) {
- super(id);
- this.elemCoder = elemCoder;
- }
-
- @Override
- public BagState<T> bind(StateBinder<? extends Object> visitor) {
- return visitor.bindBag(this, elemCoder);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
-
- if (!(obj instanceof BagStateTag)) {
- return false;
- }
-
- BagStateTag<?> that = (BagStateTag<?>) obj;
- return Objects.equals(this.id, that.id)
- && Objects.equals(this.elemCoder, that.elemCoder);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(getClass(), id, elemCoder);
- }
-
- @Override
- public StateTag<Object, BagState<T>> asKind(StateKind kind) {
- return new BagStateTag<>(id.asKind(kind), elemCoder);
- }
- }
-
- private static class WatermarkStateTagInternal<W extends BoundedWindow>
- extends StateTagBase<Object, WatermarkHoldState<W>> {
-
- /**
- * When multiple output times are added to hold the watermark, this determines how they are
- * combined, and also the behavior when merging windows. Does not contribute to equality/hash
- * since we have at most one watermark hold tag per computation.
- */
- private final OutputTimeFn<? super W> outputTimeFn;
-
- private WatermarkStateTagInternal(StructuredId id, OutputTimeFn<? super W> outputTimeFn) {
- super(id);
- this.outputTimeFn = outputTimeFn;
- }
-
- @Override
- public WatermarkHoldState<W> bind(StateBinder<? extends Object> visitor) {
- return visitor.bindWatermark(this, outputTimeFn);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
-
- if (!(obj instanceof WatermarkStateTagInternal)) {
- return false;
- }
-
- WatermarkStateTagInternal<?> that = (WatermarkStateTagInternal<?>) obj;
- return Objects.equals(this.id, that.id);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(getClass(), id);
- }
-
- @Override
- public StateTag<Object, WatermarkHoldState<W>> asKind(StateKind kind) {
- return new WatermarkStateTagInternal<W>(id.asKind(kind), outputTimeFn);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/ValueState.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/ValueState.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/ValueState.java
deleted file mode 100644
index 19c12bb..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/ValueState.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind;
-
-/**
- * State holding a single value.
- *
- * @param <T> The type of values being stored.
- */
-@Experimental(Kind.STATE)
-public interface ValueState<T> extends ReadableState<T>, State {
- /**
- * Set the value of the buffer.
- */
- void write(T input);
-
- @Override
- ValueState<T> readLater();
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/WatermarkHoldState.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/WatermarkHoldState.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/WatermarkHoldState.java
deleted file mode 100644
index 8a1adc9..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/WatermarkHoldState.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn;
-
-import org.joda.time.Instant;
-
-/**
- * A {@link State} accepting and aggregating output timestamps, which determines
- * the time to which the output watermark must be held.
- *
- * <p><b><i>For internal use only. This API may change at any time.</i></b>
- */
-@Experimental(Kind.STATE)
-public interface WatermarkHoldState<W extends BoundedWindow>
- extends CombiningState<Instant, Instant> {
- /**
- * Return the {@link OutputTimeFn} which will be used to determine a watermark hold time given
- * an element timestamp, and to combine watermarks from windows which are about to be merged.
- */
- OutputTimeFn<? super W> getOutputTimeFn();
-
- @Override
- WatermarkHoldState<W> readLater();
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/KV.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/KV.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/KV.java
deleted file mode 100644
index 23cee07..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/KV.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.values;
-
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.SerializableComparator;
-import com.google.common.base.MoreObjects;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Objects;
-
-/**
- * An immutable key/value pair.
- *
- * <p>Various {@link PTransform PTransforms} like {@link GroupByKey} and {@link Combine#perKey}
- * operate on {@link PCollection PCollections} of {@link KV KVs}.
- *
- * @param <K> the type of the key
- * @param <V> the type of the value
- */
-public class KV<K, V> implements Serializable {
- /** Returns a {@link KV} with the given key and value. */
- public static <K, V> KV<K, V> of(K key, V value) {
- return new KV<>(key, value);
- }
-
- /** Returns the key of this {@link KV}. */
- public K getKey() {
- return key;
- }
-
- /** Returns the value of this {@link KV}. */
- public V getValue() {
- return value;
- }
-
-
- /////////////////////////////////////////////////////////////////////////////
-
- final K key;
- final V value;
-
- private KV(K key, V value) {
- this.key = key;
- this.value = value;
- }
-
- @Override
- public boolean equals(Object other) {
- if (this == other) {
- return true;
- }
- if (!(other instanceof KV)) {
- return false;
- }
- KV<?, ?> otherKv = (KV<?, ?>) other;
- // Arrays are very common as values and keys, so deepEquals is mandatory
- return Objects.deepEquals(this.key, otherKv.key)
- && Objects.deepEquals(this.value, otherKv.value);
- }
-
- /**
- * A {@link Comparator} that orders {@link KV KVs} by the natural ordering of their keys.
- *
- * <p>A {@code null} key is less than any non-{@code null} key.
- */
- public static class OrderByKey<K extends Comparable<? super K>, V> implements
- SerializableComparator<KV<K, V>> {
- @Override
- public int compare(KV<K, V> a, KV<K, V> b) {
- if (a.key == null) {
- return b.key == null ? 0 : -1;
- } else if (b.key == null) {
- return 1;
- } else {
- return a.key.compareTo(b.key);
- }
- }
- }
-
- /**
- * A {@link Comparator} that orders {@link KV KVs} by the natural ordering of their values.
- *
- * <p>A {@code null} value is less than any non-{@code null} value.
- */
- public static class OrderByValue<K, V extends Comparable<? super V>>
- implements SerializableComparator<KV<K, V>> {
- @Override
- public int compare(KV<K, V> a, KV<K, V> b) {
- if (a.value == null) {
- return b.value == null ? 0 : -1;
- } else if (b.value == null) {
- return 1;
- } else {
- return a.value.compareTo(b.value);
- }
- }
- }
-
- @Override
- public int hashCode() {
- // Objects.deepEquals requires Arrays.deepHashCode for correctness
- return Arrays.deepHashCode(new Object[]{key, value});
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .addValue(key)
- .addValue(value)
- .toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PBegin.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PBegin.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PBegin.java
deleted file mode 100644
index 23ac3ae..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PBegin.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.values;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.TextIO.Read;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-
-import java.util.Collection;
-import java.util.Collections;
-
-/**
- * {@link PBegin} is the "input" to a root {@link PTransform}, such as {@link Read Read} or
- * {@link Create}.
- *
- * <p>Typically created by calling {@link Pipeline#begin} on a Pipeline.
- */
-public class PBegin implements PInput {
- /**
- * Returns a {@link PBegin} in the given {@link Pipeline}.
- */
- public static PBegin in(Pipeline pipeline) {
- return new PBegin(pipeline);
- }
-
- /**
- * Like {@link #apply(String, PTransform)} but defaulting to the name
- * of the {@link PTransform}.
- */
- public <OutputT extends POutput> OutputT apply(
- PTransform<? super PBegin, OutputT> t) {
- return Pipeline.applyTransform(this, t);
- }
-
- /**
- * Applies the given {@link PTransform} to this input {@link PBegin},
- * using {@code name} to identify this specific application of the transform.
- * This name is used in various places, including the monitoring UI, logging,
- * and to stably identify this application node in the job graph.
- */
- public <OutputT extends POutput> OutputT apply(
- String name, PTransform<? super PBegin, OutputT> t) {
- return Pipeline.applyTransform(name, this, t);
- }
-
- @Override
- public Pipeline getPipeline() {
- return pipeline;
- }
-
- @Override
- public Collection<? extends PValue> expand() {
- // A PBegin contains no PValues.
- return Collections.emptyList();
- }
-
- @Override
- public void finishSpecifying() {
- // Nothing more to be done.
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * Constructs a {@link PBegin} in the given {@link Pipeline}.
- */
- protected PBegin(Pipeline pipeline) {
- this.pipeline = pipeline;
- }
-
- private final Pipeline pipeline;
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PCollection.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PCollection.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PCollection.java
deleted file mode 100644
index 6fffddf..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PCollection.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.values;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.io.PubsubIO;
-import com.google.cloud.dataflow.sdk.io.Read;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-
-/**
- * A {@link PCollection PCollection<T>} is an immutable collection of values of type
- * {@code T}. A {@link PCollection} can contain either a bounded or unbounded
- * number of elements. Bounded and unbounded {@link PCollection PCollections} are produced
- * as the output of {@link PTransform PTransforms}
- * (including root PTransforms like {@link Read} and {@link Create}), and can
- * be passed as the inputs of other PTransforms.
- *
- * <p>Some root transforms produce bounded {@code PCollections} and others
- * produce unbounded ones. For example, {@link TextIO.Read} reads a static set
- * of files, so it produces a bounded {@link PCollection}.
- * {@link PubsubIO.Read}, on the other hand, receives a potentially infinite stream
- * of Pubsub messages, so it produces an unbounded {@link PCollection}.
- *
- * <p>Each element in a {@link PCollection} may have an associated implicit
- * timestamp. Readers assign timestamps to elements when they create
- * {@link PCollection PCollections}, and other {@link PTransform PTransforms} propagate these
- * timestamps from their input to their output. For example, {@link PubsubIO.Read}
- * assigns pubsub message timestamps to elements, and {@link TextIO.Read} assigns
- * the default value {@link BoundedWindow#TIMESTAMP_MIN_VALUE} to elements. User code can
- * explicitly assign timestamps to elements with
- * {@link com.google.cloud.dataflow.sdk.transforms.DoFn.Context#outputWithTimestamp}.
- *
- * <p>Additionally, a {@link PCollection} has an associated
- * {@link WindowFn} and each element is assigned to a set of windows.
- * By default, the windowing function is {@link GlobalWindows}
- * and all elements are assigned into a single default window.
- * This default can be overridden with the {@link Window}
- * {@link PTransform}.
- *
- * <p>See the individual {@link PTransform} subclasses for specific information
- * on how they propagate timestamps and windowing.
- *
- * @param <T> the type of the elements of this {@link PCollection}
- */
-public class PCollection<T> extends TypedPValue<T> {
-
- /**
- * The enumeration of cases for whether a {@link PCollection} is bounded.
- */
- public enum IsBounded {
- /**
- * Indicates that a {@link PCollection} contains bounded data elements, such as
- * {@link PCollection PCollections} from {@link TextIO}, {@link BigQueryIO},
- * {@link Create} e.t.c.
- */
- BOUNDED,
- /**
- * Indicates that a {@link PCollection} contains unbounded data elements, such as
- * {@link PCollection PCollections} from {@link PubsubIO}.
- */
- UNBOUNDED;
-
- /**
- * Returns the composed IsBounded property.
- *
- * <p>The composed property is {@link #BOUNDED} only if all components are {@link #BOUNDED}.
- * Otherwise, it is {@link #UNBOUNDED}.
- */
- public IsBounded and(IsBounded that) {
- if (this == BOUNDED && that == BOUNDED) {
- return BOUNDED;
- } else {
- return UNBOUNDED;
- }
- }
- }
-
- /**
- * Returns the name of this {@link PCollection}.
- *
- * <p>By default, the name of a {@link PCollection} is based on the name of the
- * {@link PTransform} that produces it. It can be specified explicitly by
- * calling {@link #setName}.
- *
- * @throws IllegalStateException if the name hasn't been set yet
- */
- @Override
- public String getName() {
- return super.getName();
- }
-
- /**
- * Sets the name of this {@link PCollection}. Returns {@code this}.
- *
- * @throws IllegalStateException if this {@link PCollection} has already been
- * finalized and may no longer be set.
- * Once {@link #apply} has been called, this will be the case.
- */
- @Override
- public PCollection<T> setName(String name) {
- super.setName(name);
- return this;
- }
-
- /**
- * Returns the {@link Coder} used by this {@link PCollection} to encode and decode
- * the values stored in it.
- *
- * @throws IllegalStateException if the {@link Coder} hasn't been set, and
- * couldn't be inferred.
- */
- @Override
- public Coder<T> getCoder() {
- return super.getCoder();
- }
-
- /**
- * Sets the {@link Coder} used by this {@link PCollection} to encode and decode the
- * values stored in it. Returns {@code this}.
- *
- * @throws IllegalStateException if this {@link PCollection} has already
- * been finalized and may no longer be set.
- * Once {@link #apply} has been called, this will be the case.
- */
- @Override
- public PCollection<T> setCoder(Coder<T> coder) {
- super.setCoder(coder);
- return this;
- }
-
- /**
- * Like {@link IsBounded#apply(String, PTransform)} but defaulting to the name
- * of the {@link PTransform}.
- *
- * @return the output of the applied {@link PTransform}
- */
- public <OutputT extends POutput> OutputT apply(PTransform<? super PCollection<T>, OutputT> t) {
- return Pipeline.applyTransform(this, t);
- }
-
- /**
- * Applies the given {@link PTransform} to this input {@link PCollection},
- * using {@code name} to identify this specific application of the transform.
- * This name is used in various places, including the monitoring UI, logging,
- * and to stably identify this application node in the job graph.
- *
- * @return the output of the applied {@link PTransform}
- */
- public <OutputT extends POutput> OutputT apply(
- String name, PTransform<? super PCollection<T>, OutputT> t) {
- return Pipeline.applyTransform(name, this, t);
- }
-
- /**
- * Returns the {@link WindowingStrategy} of this {@link PCollection}.
- */
- public WindowingStrategy<?, ?> getWindowingStrategy() {
- return windowingStrategy;
- }
-
- public IsBounded isBounded() {
- return isBounded;
- }
-
- /////////////////////////////////////////////////////////////////////////////
- // Internal details below here.
-
- /**
- * {@link WindowingStrategy} that will be used for merging windows and triggering output in this
- * {@link PCollection} and subsequence {@link PCollection PCollections} produced from this one.
- *
- * <p>By default, no merging is performed.
- */
- private WindowingStrategy<?, ?> windowingStrategy;
-
- private IsBounded isBounded;
-
- private PCollection(Pipeline p) {
- super(p);
- }
-
- /**
- * Sets the {@link TypeDescriptor TypeDescriptor<T>} for this
- * {@link PCollection PCollection<T>}. This may allow the enclosing
- * {@link PCollectionTuple}, {@link PCollectionList}, or {@code PTransform<?, PCollection<T>>},
- * etc., to provide more detailed reflective information.
- */
- @Override
- public PCollection<T> setTypeDescriptorInternal(TypeDescriptor<T> typeDescriptor) {
- super.setTypeDescriptorInternal(typeDescriptor);
- return this;
- }
-
- /**
- * Sets the {@link WindowingStrategy} of this {@link PCollection}.
- *
- * <p>For use by primitive transformations only.
- */
- public PCollection<T> setWindowingStrategyInternal(WindowingStrategy<?, ?> windowingStrategy) {
- this.windowingStrategy = windowingStrategy;
- return this;
- }
-
- /**
- * Sets the {@link PCollection.IsBounded} of this {@link PCollection}.
- *
- * <p>For use by internal transformations only.
- */
- public PCollection<T> setIsBoundedInternal(IsBounded isBounded) {
- this.isBounded = isBounded;
- return this;
- }
-
- /**
- * Creates and returns a new {@link PCollection} for a primitive output.
- *
- * <p>For use by primitive transformations only.
- */
- public static <T> PCollection<T> createPrimitiveOutputInternal(
- Pipeline pipeline,
- WindowingStrategy<?, ?> windowingStrategy,
- IsBounded isBounded) {
- return new PCollection<T>(pipeline)
- .setWindowingStrategyInternal(windowingStrategy)
- .setIsBoundedInternal(isBounded);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PCollectionList.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PCollectionList.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PCollectionList.java
deleted file mode 100644
index b99af02..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PCollectionList.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.values;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.Flatten;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.Partition;
-import com.google.common.collect.ImmutableList;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * A {@link PCollectionList PCollectionList<T>} is an immutable list of homogeneously
- * typed {@link PCollection PCollection<T>s}. A {@link PCollectionList} is used, for
- * instance, as the input to
- * {@link Flatten} or the output of {@link Partition}.
- *
- * <p>PCollectionLists can be created and accessed like follows:
- * <pre> {@code
- * PCollection<String> pc1 = ...;
- * PCollection<String> pc2 = ...;
- * PCollection<String> pc3 = ...;
- *
- * // Create a PCollectionList with three PCollections:
- * PCollectionList<String> pcs = PCollectionList.of(pc1).and(pc2).and(pc3);
- *
- * // Create an empty PCollectionList:
- * Pipeline p = ...;
- * PCollectionList<String> pcs2 = PCollectionList.<String>empty(p);
- *
- * // Get PCollections out of a PCollectionList, by index (origin 0):
- * PCollection<String> pcX = pcs.get(1);
- * PCollection<String> pcY = pcs.get(0);
- * PCollection<String> pcZ = pcs.get(2);
- *
- * // Get a list of all PCollections in a PCollectionList:
- * List<PCollection<String>> allPcs = pcs.getAll();
- * } </pre>
- *
- * @param <T> the type of the elements of all the {@link PCollection PCollections} in this list
- */
-public class PCollectionList<T> implements PInput, POutput {
- /**
- * Returns an empty {@link PCollectionList} that is part of the given {@link Pipeline}.
- *
- * <p>Longer {@link PCollectionList PCollectionLists} can be created by calling
- * {@link #and} on the result.
- */
- public static <T> PCollectionList<T> empty(Pipeline pipeline) {
- return new PCollectionList<>(pipeline);
- }
-
- /**
- * Returns a singleton {@link PCollectionList} containing the given {@link PCollection}.
- *
- * <p>Longer {@link PCollectionList PCollectionLists} can be created by calling
- * {@link #and} on the result.
- */
- public static <T> PCollectionList<T> of(PCollection<T> pc) {
- return new PCollectionList<T>(pc.getPipeline()).and(pc);
- }
-
- /**
- * Returns a {@link PCollectionList} containing the given {@link PCollection PCollections},
- * in order.
- *
- * <p>The argument list cannot be empty.
- *
- * <p>All the {@link PCollection PCollections} in the resulting {@link PCollectionList} must be
- * part of the same {@link Pipeline}.
- *
- * <p>Longer PCollectionLists can be created by calling
- * {@link #and} on the result.
- */
- public static <T> PCollectionList<T> of(Iterable<PCollection<T>> pcs) {
- Iterator<PCollection<T>> pcsIter = pcs.iterator();
- if (!pcsIter.hasNext()) {
- throw new IllegalArgumentException(
- "must either have a non-empty list of PCollections, " +
- "or must first call empty(Pipeline)");
- }
- return new PCollectionList<T>(pcsIter.next().getPipeline()).and(pcs);
- }
-
- /**
- * Returns a new {@link PCollectionList} that has all the {@link PCollection PCollections} of
- * this {@link PCollectionList} plus the given {@link PCollection} appended to the end.
- *
- * <p>All the {@link PCollection PCollections} in the resulting {@link PCollectionList} must be
- * part of the same {@link Pipeline}.
- */
- public PCollectionList<T> and(PCollection<T> pc) {
- if (pc.getPipeline() != pipeline) {
- throw new IllegalArgumentException(
- "PCollections come from different Pipelines");
- }
- return new PCollectionList<>(pipeline,
- new ImmutableList.Builder<PCollection<T>>()
- .addAll(pcollections)
- .add(pc)
- .build());
- }
-
- /**
- * Returns a new {@link PCollectionList} that has all the {@link PCollection PCollections} of
- * this {@link PCollectionList} plus the given {@link PCollection PCollections} appended to the
- * end, in order.
- *
- * <p>All the {@link PCollections} in the resulting {@link PCollectionList} must be
- * part of the same {@link Pipeline}.
- */
- public PCollectionList<T> and(Iterable<PCollection<T>> pcs) {
- List<PCollection<T>> copy = new ArrayList<>(pcollections);
- for (PCollection<T> pc : pcs) {
- if (pc.getPipeline() != pipeline) {
- throw new IllegalArgumentException(
- "PCollections come from different Pipelines");
- }
- copy.add(pc);
- }
- return new PCollectionList<>(pipeline, copy);
- }
-
- /**
- * Returns the number of {@link PCollection PCollections} in this {@link PCollectionList}.
- */
- public int size() {
- return pcollections.size();
- }
-
- /**
- * Returns the {@link PCollection} at the given index (origin zero).
- *
- * @throws IndexOutOfBoundsException if the index is out of the range
- * {@code [0..size()-1]}.
- */
- public PCollection<T> get(int index) {
- return pcollections.get(index);
- }
-
- /**
- * Returns an immutable List of all the {@link PCollection PCollections} in this
- * {@link PCollectionList}.
- */
- public List<PCollection<T>> getAll() {
- return pcollections;
- }
-
- /**
- * Like {@link #apply(String, PTransform)} but defaulting to the name
- * of the {@code PTransform}.
- */
- public <OutputT extends POutput> OutputT apply(
- PTransform<PCollectionList<T>, OutputT> t) {
- return Pipeline.applyTransform(this, t);
- }
-
- /**
- * Applies the given {@link PTransform} to this input {@link PCollectionList},
- * using {@code name} to identify this specific application of the transform.
- * This name is used in various places, including the monitoring UI, logging,
- * and to stably identify this application node in the job graph.
- *
- * @return the output of the applied {@link PTransform}
- */
- public <OutputT extends POutput> OutputT apply(
- String name, PTransform<PCollectionList<T>, OutputT> t) {
- return Pipeline.applyTransform(name, this, t);
- }
-
- /////////////////////////////////////////////////////////////////////////////
- // Internal details below here.
-
- final Pipeline pipeline;
- final List<PCollection<T>> pcollections;
-
- PCollectionList(Pipeline pipeline) {
- this(pipeline, new ArrayList<PCollection<T>>());
- }
-
- PCollectionList(Pipeline pipeline, List<PCollection<T>> pcollections) {
- this.pipeline = pipeline;
- this.pcollections = Collections.unmodifiableList(pcollections);
- }
-
- @Override
- public Pipeline getPipeline() {
- return pipeline;
- }
-
- @Override
- public Collection<? extends PValue> expand() {
- return pcollections;
- }
-
- @Override
- public void recordAsOutput(AppliedPTransform<?, ?, ?> transform) {
- int i = 0;
- for (PCollection<T> pc : pcollections) {
- pc.recordAsOutput(transform, "out" + i);
- i++;
- }
- }
-
- @Override
- public void finishSpecifying() {
- for (PCollection<T> pc : pcollections) {
- pc.finishSpecifying();
- }
- }
-
- @Override
- public void finishSpecifyingOutput() {
- for (PCollection<T> pc : pcollections) {
- pc.finishSpecifyingOutput();
- }
- }
-}