You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:27 UTC
[03/67] [partial] incubator-beam git commit: Directory reorganization
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PCollectionTuple.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PCollectionTuple.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PCollectionTuple.java
deleted file mode 100644
index 58550e4..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PCollectionTuple.java
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.values;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded;
-import com.google.common.collect.ImmutableMap;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-/**
- * A {@link PCollectionTuple} is an immutable tuple of
- * heterogeneously-typed {@link PCollection PCollections}, "keyed" by
- * {@link TupleTag TupleTags}. A {@link PCollectionTuple} can be used as the input or
- * output of a
- * {@link PTransform} taking
- * or producing multiple PCollection inputs or outputs that can be of
- * different types, for instance a
- * {@link ParDo} with side
- * outputs.
- *
- * <p>A {@link PCollectionTuple} can be created and accessed like follows:
- * <pre> {@code
- * PCollection<String> pc1 = ...;
- * PCollection<Integer> pc2 = ...;
- * PCollection<Iterable<String>> pc3 = ...;
- *
- * // Create TupleTags for each of the PCollections to put in the
- * // PCollectionTuple (the type of the TupleTag enables tracking the
- * // static type of each of the PCollections in the PCollectionTuple):
- * TupleTag<String> tag1 = new TupleTag<>();
- * TupleTag<Integer> tag2 = new TupleTag<>();
- * TupleTag<Iterable<String>> tag3 = new TupleTag<>();
- *
- * // Create a PCollectionTuple with three PCollections:
- * PCollectionTuple pcs =
- * PCollectionTuple.of(tag1, pc1)
- * .and(tag2, pc2)
- * .and(tag3, pc3);
- *
- * // Create an empty PCollectionTuple:
- * Pipeline p = ...;
- * PCollectionTuple pcs2 = PCollectionTuple.empty(p);
- *
- * // Get PCollections out of a PCollectionTuple, using the same tags
- * // that were used to put them in:
- * PCollection<Integer> pcX = pcs.get(tag2);
- * PCollection<String> pcY = pcs.get(tag1);
- * PCollection<Iterable<String>> pcZ = pcs.get(tag3);
- *
- * // Get a map of all PCollections in a PCollectionTuple:
- * Map<TupleTag<?>, PCollection<?>> allPcs = pcs.getAll();
- * } </pre>
- */
-public class PCollectionTuple implements PInput, POutput {
- /**
- * Returns an empty {@link PCollectionTuple} that is part of the given {@link Pipeline}.
- *
- * <p>A {@link PCollectionTuple} containing additional elements can be created by calling
- * {@link #and} on the result.
- */
- public static PCollectionTuple empty(Pipeline pipeline) {
- return new PCollectionTuple(pipeline);
- }
-
- /**
- * Returns a singleton {@link PCollectionTuple} containing the given
- * {@link PCollection} keyed by the given {@link TupleTag}.
- *
- * <p>A {@link PCollectionTuple} containing additional elements can be created by calling
- * {@link #and} on the result.
- */
- public static <T> PCollectionTuple of(TupleTag<T> tag, PCollection<T> pc) {
- return empty(pc.getPipeline()).and(tag, pc);
- }
-
- /**
- * Returns a new {@link PCollectionTuple} that has each {@link PCollection} and
- * {@link TupleTag} of this {@link PCollectionTuple} plus the given {@link PCollection}
- * associated with the given {@link TupleTag}.
- *
- * <p>The given {@link TupleTag} should not already be mapped to a
- * {@link PCollection} in this {@link PCollectionTuple}.
- *
- * <p>Each {@link PCollection} in the resulting {@link PCollectionTuple} must be
- * part of the same {@link Pipeline}.
- */
- public <T> PCollectionTuple and(TupleTag<T> tag, PCollection<T> pc) {
- if (pc.getPipeline() != pipeline) {
- throw new IllegalArgumentException(
- "PCollections come from different Pipelines");
- }
-
- return new PCollectionTuple(pipeline,
- new ImmutableMap.Builder<TupleTag<?>, PCollection<?>>()
- .putAll(pcollectionMap)
- .put(tag, pc)
- .build());
- }
-
- /**
- * Returns whether this {@link PCollectionTuple} contains a {@link PCollection} with
- * the given tag.
- */
- public <T> boolean has(TupleTag<T> tag) {
- return pcollectionMap.containsKey(tag);
- }
-
- /**
- * Returns the {@link PCollection} associated with the given {@link TupleTag}
- * in this {@link PCollectionTuple}. Throws {@link IllegalArgumentException} if there is no
- * such {@link PCollection}, i.e., {@code !has(tag)}.
- */
- public <T> PCollection<T> get(TupleTag<T> tag) {
- @SuppressWarnings("unchecked")
- PCollection<T> pcollection = (PCollection<T>) pcollectionMap.get(tag);
- if (pcollection == null) {
- throw new IllegalArgumentException(
- "TupleTag not found in this PCollectionTuple tuple");
- }
- return pcollection;
- }
-
- /**
- * Returns an immutable Map from {@link TupleTag} to corresponding
- * {@link PCollection}, for all the members of this {@link PCollectionTuple}.
- */
- public Map<TupleTag<?>, PCollection<?>> getAll() {
- return pcollectionMap;
- }
-
- /**
- * Like {@link #apply(String, PTransform)} but defaulting to the name
- * of the {@link PTransform}.
- *
- * @return the output of the applied {@link PTransform}
- */
- public <OutputT extends POutput> OutputT apply(
- PTransform<PCollectionTuple, OutputT> t) {
- return Pipeline.applyTransform(this, t);
- }
-
- /**
- * Applies the given {@link PTransform} to this input {@link PCollectionTuple},
- * using {@code name} to identify this specific application of the transform.
- * This name is used in various places, including the monitoring UI, logging,
- * and to stably identify this application node in the job graph.
- *
- * @return the output of the applied {@link PTransform}
- */
- public <OutputT extends POutput> OutputT apply(
- String name, PTransform<PCollectionTuple, OutputT> t) {
- return Pipeline.applyTransform(name, this, t);
- }
-
-
- /////////////////////////////////////////////////////////////////////////////
- // Internal details below here.
-
- Pipeline pipeline;
- final Map<TupleTag<?>, PCollection<?>> pcollectionMap;
-
- PCollectionTuple(Pipeline pipeline) {
- this(pipeline, new LinkedHashMap<TupleTag<?>, PCollection<?>>());
- }
-
- PCollectionTuple(Pipeline pipeline,
- Map<TupleTag<?>, PCollection<?>> pcollectionMap) {
- this.pipeline = pipeline;
- this.pcollectionMap = Collections.unmodifiableMap(pcollectionMap);
- }
-
- /**
- * Returns a {@link PCollectionTuple} with each of the given tags mapping to a new
- * output {@link PCollection}.
- *
- * <p>For use by primitive transformations only.
- */
- public static PCollectionTuple ofPrimitiveOutputsInternal(
- Pipeline pipeline,
- TupleTagList outputTags,
- WindowingStrategy<?, ?> windowingStrategy,
- IsBounded isBounded) {
- Map<TupleTag<?>, PCollection<?>> pcollectionMap = new LinkedHashMap<>();
- for (TupleTag<?> outputTag : outputTags.tupleTags) {
- if (pcollectionMap.containsKey(outputTag)) {
- throw new IllegalArgumentException(
- "TupleTag already present in this tuple");
- }
-
- // In fact, `token` and `outputCollection` should have
- // types TypeDescriptor<T> and PCollection<T> for some
- // unknown T. It is safe to create `outputCollection`
- // with type PCollection<Object> because it has the same
- // erasure as the correct type. When a transform adds
- // elements to `outputCollection` they will be of type T.
- @SuppressWarnings("unchecked")
- TypeDescriptor<Object> token = (TypeDescriptor<Object>) outputTag.getTypeDescriptor();
- PCollection<Object> outputCollection = PCollection
- .createPrimitiveOutputInternal(pipeline, windowingStrategy, isBounded)
- .setTypeDescriptorInternal(token);
-
- pcollectionMap.put(outputTag, outputCollection);
- }
- return new PCollectionTuple(pipeline, pcollectionMap);
- }
-
- @Override
- public Pipeline getPipeline() {
- return pipeline;
- }
-
- @Override
- public Collection<? extends PValue> expand() {
- return pcollectionMap.values();
- }
-
- @Override
- public void recordAsOutput(AppliedPTransform<?, ?, ?> transform) {
- int i = 0;
- for (Map.Entry<TupleTag<?>, PCollection<?>> entry
- : pcollectionMap.entrySet()) {
- TupleTag<?> tag = entry.getKey();
- PCollection<?> pc = entry.getValue();
- pc.recordAsOutput(transform, tag.getOutName(i));
- i++;
- }
- }
-
- @Override
- public void finishSpecifying() {
- for (PCollection<?> pc : pcollectionMap.values()) {
- pc.finishSpecifying();
- }
- }
-
- @Override
- public void finishSpecifyingOutput() {
- for (PCollection<?> pc : pcollectionMap.values()) {
- pc.finishSpecifyingOutput();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PCollectionView.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PCollectionView.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PCollectionView.java
deleted file mode 100644
index 515e21b..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PCollectionView.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.values;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.View;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-
-import java.io.Serializable;
-
-/**
- * A {@link PCollectionView PCollectionView<T>} is an immutable view of a {@link PCollection}
- * as a value of type {@code T} that can be accessed
- * as a side input to a {@link ParDo} transform.
- *
- * <p>A {@link PCollectionView} should always be the output of a
- * {@link com.google.cloud.dataflow.sdk.transforms.PTransform}. It is the joint responsibility of
- * this transform and each {@link com.google.cloud.dataflow.sdk.runners.PipelineRunner} to implement
- * the view in a runner-specific manner.
- *
- * <p>The most common case is using the {@link View} transforms to prepare a {@link PCollection}
- * for use as a side input to {@link ParDo}. See {@link View#asSingleton()},
- * {@link View#asIterable()}, and {@link View#asMap()} for more detail on specific views
- * available in the SDK.
- *
- * @param <T> the type of the value(s) accessible via this {@link PCollectionView}
- */
-public interface PCollectionView<T> extends PValue, Serializable {
- /**
- * A unique identifier, for internal use.
- */
- public TupleTag<Iterable<WindowedValue<?>>> getTagInternal();
-
- /**
- * For internal use only.
- */
- public T fromIterableInternal(Iterable<WindowedValue<?>> contents);
-
- /**
- * For internal use only.
- */
- public WindowingStrategy<?, ?> getWindowingStrategyInternal();
-
- /**
- * For internal use only.
- */
- public Coder<Iterable<WindowedValue<?>>> getCoderInternal();
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PDone.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PDone.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PDone.java
deleted file mode 100644
index 39a0061..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PDone.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.values;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-
-import java.util.Collection;
-import java.util.Collections;
-
-/**
- * {@link PDone} is the output of a {@link PTransform} that has a trivial result,
- * such as a {@link Write}.
- */
-public class PDone extends POutputValueBase {
-
- /**
- * Creates a {@link PDone} in the given {@link Pipeline}.
- */
- public static PDone in(Pipeline pipeline) {
- return new PDone(pipeline);
- }
-
- @Override
- public Collection<? extends PValue> expand() {
- // A PDone contains no PValues.
- return Collections.emptyList();
- }
-
- private PDone(Pipeline pipeline) {
- super(pipeline);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PInput.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PInput.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PInput.java
deleted file mode 100644
index 89b097a..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PInput.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.values;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-
-import java.util.Collection;
-
-/**
- * The interface for things that might be input to a
- * {@link com.google.cloud.dataflow.sdk.transforms.PTransform}.
- */
-public interface PInput {
- /**
- * Returns the owning {@link Pipeline} of this {@link PInput}.
- */
- public Pipeline getPipeline();
-
- /**
- * Expands this {@link PInput} into a list of its component output
- * {@link PValue PValues}.
- *
- * <ul>
- * <li>A {@link PValue} expands to itself.</li>
- * <li>A tuple or list of {@link PValue PValues} (such as
- * {@link PCollectionTuple} or {@link PCollectionList})
- * expands to its component {@code PValue PValues}.</li>
- * </ul>
- *
- * <p>Not intended to be invoked directly by user code.
- */
- public Collection<? extends PValue> expand();
-
- /**
- * <p>After building, finalizes this {@code PInput} to make it ready for
- * being used as an input to a {@link com.google.cloud.dataflow.sdk.transforms.PTransform}.
- *
- * <p>Automatically invoked whenever {@code apply()} is invoked on
- * this {@code PInput}, so users do not normally call this explicitly.
- */
- public void finishSpecifying();
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/POutput.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/POutput.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/POutput.java
deleted file mode 100644
index f99bc0b..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/POutput.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.values;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-
-import java.util.Collection;
-
-/**
- * The interface for things that might be output from a {@link PTransform}.
- */
-public interface POutput {
-
- /**
- * Returns the owning {@link Pipeline} of this {@link POutput}.
- */
- public Pipeline getPipeline();
-
- /**
- * Expands this {@link POutput} into a list of its component output
- * {@link PValue PValues}.
- *
- * <ul>
- * <li>A {@link PValue} expands to itself.</li>
- * <li>A tuple or list of {@link PValue PValues} (such as
- * {@link PCollectionTuple} or {@link PCollectionList})
- * expands to its component {@code PValue PValues}.</li>
- * </ul>
- *
- * <p>Not intended to be invoked directly by user code.
- */
- public Collection<? extends PValue> expand();
-
- /**
- * Records that this {@code POutput} is an output of the given
- * {@code PTransform}.
- *
- * <p>For a compound {@code POutput}, it is advised to call
- * this method on each component {@code POutput}.
- *
- * <p>This is not intended to be invoked by user code, but
- * is automatically invoked as part of applying the
- * producing {@link PTransform}.
- */
- public void recordAsOutput(AppliedPTransform<?, ?, ?> transform);
-
- /**
- * As part of applying the producing {@link PTransform}, finalizes this
- * output to make it ready for being used as an input and for running.
- *
- * <p>This includes ensuring that all {@link PCollection PCollections}
- * have {@link Coder Coders} specified or defaulted.
- *
- * <p>Automatically invoked whenever this {@link POutput} is used
- * as a {@link PInput} to another {@link PTransform}, or if never
- * used as a {@link PInput}, when {@link Pipeline#run}
- * is called, so users do not normally call this explicitly.
- */
- public void finishSpecifyingOutput();
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/POutputValueBase.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/POutputValueBase.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/POutputValueBase.java
deleted file mode 100644
index 69e04c3..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/POutputValueBase.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.values;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-
-/**
- * A {@link POutputValueBase} is the abstract base class of
- * {@code PTransform} outputs.
- *
- * <p>A {@link PValueBase} that adds tracking of its producing
- * {@link AppliedPTransform}.
- *
- * <p>For internal use.
- */
-public abstract class POutputValueBase implements POutput {
-
- private final Pipeline pipeline;
-
- protected POutputValueBase(Pipeline pipeline) {
- this.pipeline = pipeline;
- }
-
- /**
- * No-arg constructor for Java serialization only.
- * The resulting {@link POutputValueBase} is unlikely to be
- * valid.
- */
- protected POutputValueBase() {
- pipeline = null;
- }
-
- @Override
- public Pipeline getPipeline() {
- return pipeline;
- }
-
- /**
- * Returns the {@link AppliedPTransform} that this {@link POutputValueBase}
- * is an output of.
- *
- * <p>For internal use only.
- */
- public AppliedPTransform<?, ?, ?> getProducingTransformInternal() {
- return producingTransform;
- }
-
- /**
- * Records that this {@link POutputValueBase} is an output with the
- * given name of the given {@link AppliedPTransform}.
- *
- * <p>To be invoked only by {@link POutput#recordAsOutput}
- * implementations. Not to be invoked directly by user code.
- */
- @Override
- public void recordAsOutput(AppliedPTransform<?, ?, ?> transform) {
- if (producingTransform != null) {
- // Already used this POutput as a PTransform output. This can
- // happen if the POutput is an output of a transform within a
- // composite transform, and is also the result of the composite.
- // We want to record the "immediate" atomic transform producing
- // this output, and ignore all later composite transforms that
- // also produce this output.
- //
- // Pipeline.applyInternal() uses !hasProducingTransform() to
- // avoid calling this operation redundantly, but
- // hasProducingTransform() doesn't apply to POutputValueBases
- // that aren't PValues or composites of PValues, e.g., PDone.
- return;
- }
- producingTransform = transform;
- }
-
- /**
- * Default behavior for {@link #finishSpecifyingOutput()} is
- * to do nothing. Override if your {@link PValue} requires
- * finalization.
- */
- @Override
- public void finishSpecifyingOutput() { }
-
- /**
- * The {@link PTransform} that produces this {@link POutputValueBase}.
- */
- private AppliedPTransform<?, ?, ?> producingTransform;
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PValue.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PValue.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PValue.java
deleted file mode 100644
index eb95a23..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PValue.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.values;
-
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-
-/**
- * The interface for values that can be input to and output from {@link PTransform PTransforms}.
- */
-public interface PValue extends POutput, PInput {
-
- /**
- * Returns the name of this {@link PValue}.
- */
- public String getName();
-
- /**
- * Returns the {@link AppliedPTransform} that this {@link PValue} is an output of.
- *
- * <p>For internal use only.
- */
- public AppliedPTransform<?, ?, ?> getProducingTransformInternal();
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PValueBase.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PValueBase.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PValueBase.java
deleted file mode 100644
index 7e57204..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PValueBase.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.values;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.util.StringUtils;
-
-import java.util.Collection;
-import java.util.Collections;
-
-/**
- * A {@link PValueBase} is an abstract base class that provides
- * sensible default implementations for methods of {@link PValue}.
- * In particular, this includes functionality for getting/setting:
- *
- * <ul>
- * <li> The {@link Pipeline} that the {@link PValue} is part of.</li>
- * <li> Whether the {@link PValue} has bee finalized (as an input
- * or an output), after which its properties can no longer be changed.</li>
- * </ul>
- *
- * <p>For internal use.
- */
-public abstract class PValueBase extends POutputValueBase implements PValue {
- /**
- * Returns the name of this {@link PValueBase}.
- *
- * <p>By default, the name of a {@link PValueBase} is based on the
- * name of the {@link PTransform} that produces it. It can be
- * specified explicitly by calling {@link #setName}.
- *
- * @throws IllegalStateException if the name hasn't been set yet
- */
- @Override
- public String getName() {
- if (name == null) {
- throw new IllegalStateException("name not set");
- }
- return name;
- }
-
- /**
- * Sets the name of this {@link PValueBase}. Returns {@code this}.
- *
- * @throws IllegalStateException if this {@link PValueBase} has
- * already been finalized and may no longer be set.
- */
- public PValueBase setName(String name) {
- if (finishedSpecifying) {
- throw new IllegalStateException(
- "cannot change the name of " + this + " once it's been used");
- }
- this.name = name;
- return this;
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- protected PValueBase(Pipeline pipeline) {
- super(pipeline);
- }
-
- /**
- * No-arg constructor for Java serialization only.
- * The resulting {@link PValueBase} is unlikely to be
- * valid.
- */
- protected PValueBase() {
- super();
- }
-
- /**
- * The name of this {@link PValueBase}, or null if not yet set.
- */
- private String name;
-
- /**
- * Whether this {@link PValueBase} has been finalized, and its core
- * properties, e.g., name, can no longer be changed.
- */
- private boolean finishedSpecifying = false;
-
- @Override
- public void recordAsOutput(AppliedPTransform<?, ?, ?> transform) {
- recordAsOutput(transform, "out");
- }
-
- /**
- * Records that this {@link POutputValueBase} is an output with the
- * given name of the given {@link AppliedPTransform} in the given
- * {@link Pipeline}.
- *
- * <p>To be invoked only by {@link POutput#recordAsOutput}
- * implementations. Not to be invoked directly by user code.
- */
- protected void recordAsOutput(AppliedPTransform<?, ?, ?> transform,
- String outName) {
- super.recordAsOutput(transform);
- if (name == null) {
- name = transform.getFullName() + "." + outName;
- }
- }
-
- /**
- * Returns whether this {@link PValueBase} has been finalized, and
- * its core properties, e.g., name, can no longer be changed.
- *
- * <p>For internal use only.
- */
- public boolean isFinishedSpecifyingInternal() {
- return finishedSpecifying;
- }
-
- @Override
- public Collection<? extends PValue> expand() {
- return Collections.singletonList(this);
- }
-
- @Override
- public void finishSpecifying() {
- finishSpecifyingOutput();
- finishedSpecifying = true;
- }
-
- @Override
- public String toString() {
- return (name == null ? "<unnamed>" : getName())
- + " [" + getKindString() + "]";
- }
-
- /**
- * Returns a {@link String} capturing the kind of this
- * {@link PValueBase}.
- *
- * <p>By default, uses the base name of the current class as its kind string.
- */
- protected String getKindString() {
- return StringUtils.approximateSimpleName(getClass());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/TimestampedValue.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/TimestampedValue.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/TimestampedValue.java
deleted file mode 100644
index 1085d44..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/TimestampedValue.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.values;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.InstantCoder;
-import com.google.cloud.dataflow.sdk.coders.StandardCoder;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-
-/**
- * An immutable pair of a value and a timestamp.
- *
- * <p>The timestamp of a value determines many properties, such as its assignment to
- * windows and whether the value is late (with respect to the watermark of a {@link PCollection}).
- *
- * @param <V> the type of the value
- */
-public class TimestampedValue<V> {
-
- /**
- * Returns a new {@code TimestampedValue} with the given value and timestamp.
- */
- public static <V> TimestampedValue<V> of(V value, Instant timestamp) {
- return new TimestampedValue<>(value, timestamp);
- }
-
- public V getValue() {
- return value;
- }
-
- public Instant getTimestamp() {
- return timestamp;
- }
-
- @Override
- public boolean equals(Object other) {
- if (!(other instanceof TimestampedValue)) {
- return false;
- }
- TimestampedValue<?> that = (TimestampedValue<?>) other;
- return Objects.equals(value, that.value) && Objects.equals(timestamp, that.timestamp);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(value, timestamp);
- }
-
- @Override
- public String toString() {
- return "TimestampedValue(" + value + ", " + timestamp + ")";
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * A {@link Coder} for {@link TimestampedValue}.
- */
- public static class TimestampedValueCoder<T>
- extends StandardCoder<TimestampedValue<T>> {
-
- private final Coder<T> valueCoder;
-
- public static <T> TimestampedValueCoder<T> of(Coder<T> valueCoder) {
- return new TimestampedValueCoder<>(valueCoder);
- }
-
- @JsonCreator
- public static TimestampedValueCoder<?> of(
- @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
- List<Object> components) {
- checkArgument(components.size() == 1,
- "Expecting 1 component, got " + components.size());
- return of((Coder<?>) components.get(0));
- }
-
- @SuppressWarnings("unchecked")
- TimestampedValueCoder(Coder<T> valueCoder) {
- this.valueCoder = checkNotNull(valueCoder);
- }
-
- @Override
- public void encode(TimestampedValue<T> windowedElem,
- OutputStream outStream,
- Context context)
- throws IOException {
- valueCoder.encode(windowedElem.getValue(), outStream, context.nested());
- InstantCoder.of().encode(
- windowedElem.getTimestamp(), outStream, context);
- }
-
- @Override
- public TimestampedValue<T> decode(InputStream inStream, Context context)
- throws IOException {
- T value = valueCoder.decode(inStream, context.nested());
- Instant timestamp = InstantCoder.of().decode(inStream, context);
- return TimestampedValue.of(value, timestamp);
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- verifyDeterministic(
- "TimestampedValueCoder requires a deterministic valueCoder",
- valueCoder);
- }
-
- @Override
- public List<? extends Coder<?>> getCoderArguments() {
- return Arrays.<Coder<?>>asList(valueCoder);
- }
-
- public static <T> List<Object> getInstanceComponents(TimestampedValue<T> exampleValue) {
- return Arrays.<Object>asList(exampleValue.getValue());
- }
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- private final V value;
- private final Instant timestamp;
-
- protected TimestampedValue(V value, Instant timestamp) {
- this.value = value;
- this.timestamp = timestamp;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/TupleTag.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/TupleTag.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/TupleTag.java
deleted file mode 100644
index 7494921..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/TupleTag.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.values;
-
-import static com.google.cloud.dataflow.sdk.util.Structs.addBoolean;
-import static com.google.cloud.dataflow.sdk.util.Structs.addString;
-
-import com.google.cloud.dataflow.sdk.util.CloudObject;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.common.collect.HashMultiset;
-import com.google.common.collect.Multiset;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.Serializable;
-import java.util.Random;
-
-/**
- * A {@link TupleTag} is a typed tag to use as the key of a
- * heterogeneously typed tuple, like {@link PCollectionTuple}.
- * Its generic type parameter allows tracking
- * the static type of things stored in tuples.
- *
- * <p>To aid in assigning default {@link Coder Coders} for results of
- * side outputs of {@link ParDo}, an output
- * {@link TupleTag} should be instantiated with an extra {@code {}} so
- * it is an instance of an anonymous subclass without generic type
- * parameters. Input {@link TupleTag TupleTags} require no such extra
- * instantiation (although it doesn't hurt). For example:
- *
- * <pre> {@code
- * TupleTag<SomeType> inputTag = new TupleTag<>();
- * TupleTag<SomeOtherType> outputTag = new TupleTag<SomeOtherType>(){};
- * } </pre>
- *
- * @param <V> the type of the elements or values of the tagged thing,
- * e.g., a {@code PCollection<V>}.
- */
-public class TupleTag<V> implements Serializable {
- /**
- * Constructs a new {@code TupleTag}, with a fresh unique id.
- *
- * <p>This is the normal way {@code TupleTag}s are constructed.
- */
- public TupleTag() {
- this(genId(), true);
- }
-
- /**
- * Constructs a new {@code TupleTag} with the given id.
- *
- * <p>It is up to the user to ensure that two {@code TupleTag}s
- * with the same id actually mean the same tag and carry the same
- * generic type parameter. Violating this invariant can lead to
- * hard-to-diagnose runtime type errors. Consequently, this
- * operation should be used very sparingly, such as when the
- * producer and consumer of {@code TupleTag}s are written in
- * separate modules and can only coordinate via ids rather than
- * shared {@code TupleTag} instances. Most of the time,
- * {@link #TupleTag()} should be preferred.
- */
- public TupleTag(String id) {
- this(id, false);
- }
-
- /**
- * Returns the id of this {@code TupleTag}.
- *
- * <p>Two {@code TupleTag}s with the same id are considered equal.
- *
- * <p>{@code TupleTag}s are not ordered, i.e., the class does not implement
- * Comparable interface. TupleTags implement equals and hashCode, making them
- * suitable for use as keys in HashMap and HashSet.
- */
- public String getId() {
- return id;
- }
-
- /**
- * If this {@code TupleTag} is tagging output {@code outputIndex} of
- * a {@code PTransform}, returns the name that should be used by
- * default for the output.
- */
- public String getOutName(int outIndex) {
- if (generated) {
- return "out" + outIndex;
- } else {
- return id;
- }
- }
-
- /**
- * Returns a {@code TypeDescriptor} capturing what is known statically
- * about the type of this {@code TupleTag} instance's most-derived
- * class.
- *
- * <p>This is useful for a {@code TupleTag} constructed as an
- * instance of an anonymous subclass with a trailing {@code {}},
- * e.g., {@code new TupleTag<SomeType>(){}}.
- */
- public TypeDescriptor<V> getTypeDescriptor() {
- return new TypeDescriptor<V>(getClass()) {};
- }
-
- /////////////////////////////////////////////////////////////////////////////
- // Internal details below here.
-
- static final Random RANDOM = new Random(0);
- private static final Multiset<String> staticInits = HashMultiset.create();
-
- final String id;
- final boolean generated;
-
- /** Generates and returns a fresh unique id for a TupleTag's id. */
- static synchronized String genId() {
- // It is a common pattern to store tags that are shared between the main
- // program and workers in static variables, but such references are not
- // serialized as part of the *Fns state. Fortunately, most such tags
- // are constructed in static class initializers, e.g.
- //
- // static final TupleTag<T> MY_TAG = new TupleTag<>();
- //
- // and class initialization order is well defined by the JVM spec, so in
- // this case we can assign deterministic ids.
- StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
- for (StackTraceElement frame : stackTrace) {
- if (frame.getMethodName().equals("<clinit>")) {
- int counter = staticInits.add(frame.getClassName(), 1);
- return frame.getClassName() + "#" + counter;
- }
- }
- // Otherwise, assume it'll be serialized and choose a random value to reduce
- // the chance of collision.
- String nonce = Long.toHexString(RANDOM.nextLong());
- // [Thread.getStackTrace, TupleTag.getId, TupleTag.<init>, caller, ...]
- String caller = stackTrace.length >= 4
- ? stackTrace[3].getClassName() + "." + stackTrace[3].getMethodName()
- + ":" + stackTrace[3].getLineNumber()
- : "unknown";
- return caller + "#" + nonce;
- }
-
- @JsonCreator
- @SuppressWarnings("unused")
- private static TupleTag<?> fromJson(
- @JsonProperty(PropertyNames.VALUE) String id,
- @JsonProperty(PropertyNames.IS_GENERATED) boolean generated) {
- return new TupleTag<>(id, generated);
- }
-
- private TupleTag(String id, boolean generated) {
- this.id = id;
- this.generated = generated;
- }
-
- public CloudObject asCloudObject() {
- CloudObject result = CloudObject.forClass(getClass());
- addString(result, PropertyNames.VALUE, id);
- addBoolean(result, PropertyNames.IS_GENERATED, generated);
- return result;
- }
-
- @Override
- public boolean equals(Object that) {
- if (that instanceof TupleTag) {
- return this.id.equals(((TupleTag<?>) that).id);
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return id.hashCode();
- }
-
- @Override
- public String toString() {
- return "Tag<" + id + ">";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/TupleTagList.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/TupleTagList.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/TupleTagList.java
deleted file mode 100644
index f019fc2..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/TupleTagList.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.values;
-
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.common.collect.ImmutableList;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * A {@link TupleTagList} is an immutable list of heterogeneously
- * typed {@link TupleTag TupleTags}. A {@link TupleTagList} is used, for instance, to
- * specify the tags of the side outputs of a
- * {@link ParDo}.
- *
- * <p>A {@link TupleTagList} can be created and accessed like follows:
- * <pre> {@code
- * TupleTag<String> tag1 = ...;
- * TupleTag<Integer> tag2 = ...;
- * TupleTag<Iterable<String>> tag3 = ...;
- *
- * // Create a TupleTagList with three TupleTags:
- * TupleTagList tags = TupleTagList.of(tag1).and(tag2).and(tag3);
- *
- * // Create an empty TupleTagList:
- * Pipeline p = ...;
- * TupleTagList tags2 = TupleTagList.empty(p);
- *
- * // Get TupleTags out of a TupleTagList, by index (origin 0):
- * TupleTag<?> tagX = tags.get(1);
- * TupleTag<?> tagY = tags.get(0);
- * TupleTag<?> tagZ = tags.get(2);
- *
- * // Get a list of all TupleTags in a TupleTagList:
- * List<TupleTag<?>> allTags = tags.getAll();
- * } </pre>
- */
-public class TupleTagList implements Serializable {
- /**
- * Returns an empty {@link TupleTagList}.
- *
- * <p>Longer {@link TupleTagList TupleTagLists} can be created by calling
- * {@link #and} on the result.
- */
- public static TupleTagList empty() {
- return new TupleTagList();
- }
-
- /**
- * Returns a singleton {@link TupleTagList} containing the given {@link TupleTag}.
- *
- * <p>Longer {@link TupleTagList TupleTagLists} can be created by calling
- * {@link #and} on the result.
- */
- public static TupleTagList of(TupleTag<?> tag) {
- return empty().and(tag);
- }
-
- /**
- * Returns a {@link TupleTagList} containing the given {@link TupleTag TupleTags}, in order.
- *
- * <p>Longer {@link TupleTagList TupleTagLists} can be created by calling
- * {@link #and} on the result.
- */
- public static TupleTagList of(List<TupleTag<?>> tags) {
- return empty().and(tags);
- }
-
- /**
- * Returns a new {@link TupleTagList} that has all the {@link TupleTag TupleTags} of
- * this {@link TupleTagList} plus the given {@link TupleTag} appended to the end.
- */
- public TupleTagList and(TupleTag<?> tag) {
- return new TupleTagList(
- new ImmutableList.Builder<TupleTag<?>>()
- .addAll(tupleTags)
- .add(tag)
- .build());
- }
-
- /**
- * Returns a new {@link TupleTagList} that has all the {@link TupleTag TupleTags} of
- * this {@link TupleTagList} plus the given {@link TupleTag TupleTags} appended to the end,
- * in order.
- */
- public TupleTagList and(List<TupleTag<?>> tags) {
- return new TupleTagList(
- new ImmutableList.Builder<TupleTag<?>>()
- .addAll(tupleTags)
- .addAll(tags)
- .build());
- }
-
- /**
- * Returns the number of TupleTags in this TupleTagList.
- */
- public int size() {
- return tupleTags.size();
- }
-
- /**
- * Returns the {@link TupleTag} at the given index (origin zero).
- *
- * @throws IndexOutOfBoundsException if the index is out of the range
- * {@code [0..size()-1]}.
- */
- public TupleTag<?> get(int index) {
- return tupleTags.get(index);
- }
-
- /**
- * Returns an immutable List of all the {@link TupleTag TupleTags} in this {@link TupleTagList}.
- */
- public List<TupleTag<?>> getAll() {
- return tupleTags;
- }
-
-
- /////////////////////////////////////////////////////////////////////////////
- // Internal details below here.
-
- final List<TupleTag<?>> tupleTags;
-
- TupleTagList() {
- this(new ArrayList<TupleTag<?>>());
- }
-
- TupleTagList(List<TupleTag<?>> tupleTags) {
- this.tupleTags = Collections.unmodifiableList(tupleTags);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/TypeDescriptor.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/TypeDescriptor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/TypeDescriptor.java
deleted file mode 100644
index 559d67c..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/TypeDescriptor.java
+++ /dev/null
@@ -1,351 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.values;
-
-import com.google.common.collect.Lists;
-import com.google.common.reflect.Invokable;
-import com.google.common.reflect.Parameter;
-import com.google.common.reflect.TypeToken;
-
-import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.lang.reflect.TypeVariable;
-import java.util.List;
-
-import javax.annotation.Nullable;
-
-/**
- * A description of a Java type, including actual generic parameters where possible.
- *
- * <p>To prevent losing actual type arguments due to erasure, create an anonymous subclass
- * with concrete types:
- * <pre>
- * {@code
- * TypeDecriptor<List<String>> = new TypeDescriptor<List<String>>() {};
- * }
- * </pre>
- *
- * <p>If the above were not an anonymous subclass, the type {@code List<String>}
- * would be erased and unavailable at run time.
- *
- * @param <T> the type represented by this {@link TypeDescriptor}
- */
-public abstract class TypeDescriptor<T> implements Serializable {
-
- // This class is just a wrapper for TypeToken
- private final TypeToken<T> token;
-
- /**
- * Creates a {@link TypeDescriptor} wrapping the provided token.
- * This constructor is private so Guava types do not leak.
- */
- private TypeDescriptor(TypeToken<T> token) {
- this.token = token;
- }
-
- /**
- * Creates a {@link TypeDescriptor} representing
- * the type parameter {@code T}. To use this constructor
- * properly, the type parameter must be a concrete type, for example
- * {@code new TypeDescriptor<List<String>>(){}}.
- */
- protected TypeDescriptor() {
- token = new TypeToken<T>(getClass()) {};
- }
-
- /**
- * Creates a {@link TypeDescriptor} representing the type parameter {@code T}, which should
- * resolve to a concrete type in the context of the class {@code clazz}.
- *
- * <p>Unlike {@link TypeDescriptor#TypeDescriptor(Class)} this will also use context's of the
- * enclosing instances while attempting to resolve the type. This means that the types of any
- * classes instantiated in the concrete instance should be resolvable.
- */
- protected TypeDescriptor(Object instance) {
- TypeToken<?> unresolvedToken = new TypeToken<T>(getClass()) {};
-
- // While we haven't fully resolved the parameters, refine it using the captured
- // enclosing instance of the object.
- unresolvedToken = TypeToken.of(instance.getClass()).resolveType(unresolvedToken.getType());
-
- if (hasUnresolvedParameters(unresolvedToken.getType())) {
- for (Field field : instance.getClass().getDeclaredFields()) {
- Object fieldInstance = getEnclosingInstance(field, instance);
- if (fieldInstance != null) {
- unresolvedToken =
- TypeToken.of(fieldInstance.getClass()).resolveType(unresolvedToken.getType());
- if (!hasUnresolvedParameters(unresolvedToken.getType())) {
- break;
- }
- }
- }
- }
-
- // Once we've either fully resolved the parameters or exhausted enclosing instances, we have
- // the best approximation to the token we can get.
- @SuppressWarnings("unchecked")
- TypeToken<T> typedToken = (TypeToken<T>) unresolvedToken;
- token = typedToken;
- }
-
- private boolean hasUnresolvedParameters(Type type) {
- if (type instanceof TypeVariable) {
- return true;
- } else if (type instanceof ParameterizedType) {
- ParameterizedType param = (ParameterizedType) type;
- for (Type arg : param.getActualTypeArguments()) {
- if (hasUnresolvedParameters(arg)) {
- return true;
- }
- }
- }
- return false;
- }
-
- /**
- * Returns the enclosing instance if the field is synthetic and it is able to access it, or
- * {@literal null} if not.
- */
- @Nullable
- private Object getEnclosingInstance(Field field, Object instance) {
- if (!field.isSynthetic()) {
- return null;
- }
-
- boolean accessible = field.isAccessible();
- try {
- field.setAccessible(true);
- return field.get(instance);
- } catch (IllegalArgumentException | IllegalAccessException e) {
- // If we fail to get the enclosing instance field, do nothing. In the worst case, we won't
- // refine the type based on information in this enclosing class -- that is consistent with
- // previous behavior and is still a correct answer that can be fixed by returning the correct
- // type descriptor.
- return null;
- } finally {
- field.setAccessible(accessible);
- }
- }
-
- /**
- * Creates a {@link TypeDescriptor} representing the type parameter
- * {@code T}, which should resolve to a concrete type in the context
- * of the class {@code clazz}.
- */
- @SuppressWarnings("unchecked")
- protected TypeDescriptor(Class<?> clazz) {
- TypeToken<T> unresolvedToken = new TypeToken<T>(getClass()) {};
- token = (TypeToken<T>) TypeToken.of(clazz).resolveType(unresolvedToken.getType());
- }
-
- /**
- * Returns a {@link TypeDescriptor} representing the given type.
- */
- public static <T> TypeDescriptor<T> of(Class<T> type) {
- return new SimpleTypeDescriptor<>(TypeToken.<T>of(type));
- }
-
- /**
- * Returns a {@link TypeDescriptor} representing the given type.
- */
- @SuppressWarnings("unchecked")
- public static TypeDescriptor<?> of(Type type) {
- return new SimpleTypeDescriptor<>((TypeToken<Object>) TypeToken.of(type));
- }
-
- /**
- * Returns the {@link Type} represented by this {@link TypeDescriptor}.
- */
- public Type getType() {
- return token.getType();
- }
-
- /**
- * Returns the {@link Class} underlying the {@link Type} represented by
- * this {@link TypeDescriptor}.
- */
- public Class<? super T> getRawType() {
- return token.getRawType();
- }
-
- /**
- * Returns the component type if this type is an array type,
- * otherwise returns {@code null}.
- */
- public TypeDescriptor<?> getComponentType() {
- return new SimpleTypeDescriptor<>(token.getComponentType());
- }
-
- /**
- * Returns the generic form of a supertype.
- */
- public final TypeDescriptor<? super T> getSupertype(Class<? super T> superclass) {
- return new SimpleTypeDescriptor<>(token.getSupertype(superclass));
- }
-
- /**
- * Returns true if this type is known to be an array type.
- */
- public final boolean isArray() {
- return token.isArray();
- }
-
- /**
- * Returns a {@link TypeVariable} for the named type parameter. Throws
- * {@link IllegalArgumentException} if a type variable by the requested type parameter is not
- * found.
- *
- * <p>For example, {@code new TypeDescriptor<List>(){}.getTypeParameter("T")} returns a
- * {@code TypeVariable<? super List>} representing the formal type parameter {@code T}.
- *
- * <p>Do not mistake the type parameters (formal type argument list) with the actual
- * type arguments. For example, if a class {@code Foo} extends {@code List<String>}, it
- * does not make sense to ask for a type parameter, because {@code Foo} does not have any.
- */
- public final TypeVariable<Class<? super T>> getTypeParameter(String paramName) {
- // Cannot convert TypeVariable<Class<? super T>>[] to TypeVariable<Class<? super T>>[]
- // due to how they are used here, so the result of getTypeParameters() cannot be used
- // without upcast.
- Class<?> rawType = getRawType();
- for (TypeVariable<?> param : rawType.getTypeParameters()) {
- if (param.getName().equals(paramName)) {
- @SuppressWarnings("unchecked")
- TypeVariable<Class<? super T>> typedParam = (TypeVariable<Class<? super T>>) param;
- return typedParam;
- }
- }
- throw new IllegalArgumentException(
- "No type parameter named " + paramName + " found on " + getRawType());
- }
-
- /**
- * Returns true if this type is assignable from the given type.
- */
- public final boolean isSupertypeOf(TypeDescriptor<?> source) {
- return token.isSupertypeOf(source.token);
- }
-
- /**
- * Return true if this type is a subtype of the given type.
- */
- public final boolean isSubtypeOf(TypeDescriptor<?> parent) {
- return token.isSubtypeOf(parent.token);
- }
-
- /**
- * Returns a list of argument types for the given method, which must
- * be a part of the class.
- */
- public List<TypeDescriptor<?>> getArgumentTypes(Method method) {
- Invokable<?, ?> typedMethod = token.method(method);
-
- List<TypeDescriptor<?>> argTypes = Lists.newArrayList();
- for (Parameter parameter : typedMethod.getParameters()) {
- argTypes.add(new SimpleTypeDescriptor<>(parameter.getType()));
- }
- return argTypes;
- }
-
- /**
- * Returns a {@link TypeDescriptor} representing the given
- * type, with type variables resolved according to the specialization
- * in this type.
- *
- * <p>For example, consider the following class:
- * <pre>
- * {@code
- * class MyList implements List<String> { ... }
- * }
- * </pre>
- *
- * <p>The {@link TypeDescriptor} returned by
- * <pre>
- * {@code
- * TypeDescriptor.of(MyList.class)
- * .resolveType(Mylist.class.getMethod("get", int.class).getGenericReturnType)
- * }
- * </pre>
- * will represent the type {@code String}.
- */
- public TypeDescriptor<?> resolveType(Type type) {
- return new SimpleTypeDescriptor<>(token.resolveType(type));
- }
-
- /**
- * Returns a set of {@link TypeDescriptor}s, one for each
- * interface implemented by this class.
- */
- @SuppressWarnings("rawtypes")
- public Iterable<TypeDescriptor> getInterfaces() {
- List<TypeDescriptor> interfaces = Lists.newArrayList();
- for (TypeToken<?> interfaceToken : token.getTypes().interfaces()) {
- interfaces.add(new SimpleTypeDescriptor<>(interfaceToken));
- }
- return interfaces;
- }
-
- /**
- * Returns a set of {@link TypeDescriptor}s, one for each
- * superclass (including this class).
- */
- @SuppressWarnings("rawtypes")
- public Iterable<TypeDescriptor> getClasses() {
- List<TypeDescriptor> classes = Lists.newArrayList();
- for (TypeToken<?> classToken : token.getTypes().classes()) {
- classes.add(new SimpleTypeDescriptor<>(classToken));
- }
- return classes;
- }
-
- @Override
- public String toString() {
- return token.toString();
- }
-
- /**
- * Two type descriptor are equal if and only if they
- * represent the same type.
- */
- @Override
- public boolean equals(Object other) {
- if (!(other instanceof TypeDescriptor)) {
- return false;
- } else {
- @SuppressWarnings("unchecked")
- TypeDescriptor<?> descriptor = (TypeDescriptor<?>) other;
- return token.equals(descriptor.token);
- }
- }
-
- @Override
- public int hashCode() {
- return token.hashCode();
- }
-
- /**
- * A non-abstract {@link TypeDescriptor} for construction directly from an existing
- * {@link TypeToken}.
- */
- private static final class SimpleTypeDescriptor<T> extends TypeDescriptor<T> {
- SimpleTypeDescriptor(TypeToken<T> typeToken) {
- super(typeToken);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/TypedPValue.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/TypedPValue.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/TypedPValue.java
deleted file mode 100644
index 29fd639..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/TypedPValue.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.values;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException.ReasonCode;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderRegistry;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-
-/**
- * A {@link TypedPValue TypedPValue<T>} is the abstract base class of things that
- * store some number of values of type {@code T}.
- *
- * <p>Because we know the type {@code T}, this is the layer of the inheritance hierarchy where
- * we store a coder for objects of type {@code T}.
- *
- * @param <T> the type of the values stored in this {@link TypedPValue}
- */
-public abstract class TypedPValue<T> extends PValueBase implements PValue {
-
- /**
- * Returns the {@link Coder} used by this {@link TypedPValue} to encode and decode
- * the values stored in it.
- *
- * @throws IllegalStateException if the {@link Coder} hasn't been set, and
- * couldn't be inferred.
- */
- public Coder<T> getCoder() {
- if (coder == null) {
- coder = inferCoderOrFail();
- }
- return coder;
- }
-
- /**
- * Sets the {@link Coder} used by this {@link TypedPValue} to encode and decode the
- * values stored in it. Returns {@code this}.
- *
- * @throws IllegalStateException if this {@link TypedPValue} has already
- * been finalized and is no longer settable, e.g., by having
- * {@code apply()} called on it
- */
- public TypedPValue<T> setCoder(Coder<T> coder) {
- if (isFinishedSpecifyingInternal()) {
- throw new IllegalStateException(
- "cannot change the Coder of " + this + " once it's been used");
- }
- if (coder == null) {
- throw new IllegalArgumentException(
- "Cannot setCoder(null)");
- }
- this.coder = coder;
- return this;
- }
-
- /**
- * After building, finalizes this {@link PValue} to make it ready for
- * running. Automatically invoked whenever the {@link PValue} is "used"
- * (e.g., when apply() is called on it) and when the Pipeline is
- * run (useful if this is a {@link PValue} with no consumers).
- */
- @Override
- public void finishSpecifying() {
- if (isFinishedSpecifyingInternal()) {
- return;
- }
- super.finishSpecifying();
- // Ensure that this TypedPValue has a coder by inferring the coder if none exists; If not,
- // this will throw an exception.
- getCoder();
- }
-
- /////////////////////////////////////////////////////////////////////////////
- // Internal details below here.
-
- /**
- * The {@link Coder} used by this {@link TypedPValue} to encode and decode the
- * values stored in it, or null if not specified nor inferred yet.
- */
- private Coder<T> coder;
-
- protected TypedPValue(Pipeline p) {
- super(p);
- }
-
- private TypeDescriptor<T> typeDescriptor;
-
- /**
- * Returns a {@link TypeDescriptor TypeDescriptor<T>} with some reflective information
- * about {@code T}, if possible. May return {@code null} if no information
- * is available. Subclasses may override this to enable better
- * {@code Coder} inference.
- */
- public TypeDescriptor<T> getTypeDescriptor() {
- return typeDescriptor;
- }
-
- /**
- * Sets the {@link TypeDescriptor TypeDescriptor<T>} associated with this class. Better
- * reflective type information will lead to better {@link Coder}
- * inference.
- */
- public TypedPValue<T> setTypeDescriptorInternal(TypeDescriptor<T> typeDescriptor) {
- this.typeDescriptor = typeDescriptor;
- return this;
- }
-
- /**
- * If the coder is not explicitly set, this sets the coder for
- * this {@link TypedPValue} to the best coder that can be inferred
- * based upon the known {@link TypeDescriptor}. By default, this is null,
- * but can and should be improved by subclasses.
- */
- @SuppressWarnings({"unchecked", "rawtypes"})
- private Coder<T> inferCoderOrFail() {
- // First option for a coder: use the Coder set on this PValue.
- if (coder != null) {
- return coder;
- }
-
- AppliedPTransform<?, ?, ?> application = getProducingTransformInternal();
-
- // Second option for a coder: Look in the coder registry.
- CoderRegistry registry = getPipeline().getCoderRegistry();
- TypeDescriptor<T> token = getTypeDescriptor();
- CannotProvideCoderException inferFromTokenException = null;
- if (token != null) {
- try {
- return registry.getDefaultCoder(token);
- } catch (CannotProvideCoderException exc) {
- inferFromTokenException = exc;
- // Attempt to detect when the token came from a TupleTag used for a ParDo side output,
- // and provide a better error message if so. Unfortunately, this information is not
- // directly available from the TypeDescriptor, so infer based on the type of the PTransform
- // and the error message itself.
- if (application.getTransform() instanceof ParDo.BoundMulti
- && exc.getReason() == ReasonCode.TYPE_ERASURE) {
- inferFromTokenException = new CannotProvideCoderException(exc.getMessage()
- + " If this error occurs for a side output of the producing ParDo, verify that the "
- + "TupleTag for this output is constructed with proper type information (see "
- + "TupleTag Javadoc) or explicitly set the Coder to use if this is not possible.");
- }
- }
- }
-
- // Third option for a coder: use the default Coder from the producing PTransform.
- CannotProvideCoderException inputCoderException;
- try {
- return ((PTransform) application.getTransform()).getDefaultOutputCoder(
- application.getInput(), this);
- } catch (CannotProvideCoderException exc) {
- inputCoderException = exc;
- }
-
- // Build up the error message and list of causes.
- StringBuilder messageBuilder = new StringBuilder()
- .append("Unable to return a default Coder for ").append(this)
- .append(". Correct one of the following root causes:");
-
- // No exception, but give the user a message about .setCoder() has not been called.
- messageBuilder.append("\n No Coder has been manually specified; ")
- .append(" you may do so using .setCoder().");
-
- if (inferFromTokenException != null) {
- messageBuilder
- .append("\n Inferring a Coder from the CoderRegistry failed: ")
- .append(inferFromTokenException.getMessage());
- }
-
- if (inputCoderException != null) {
- messageBuilder
- .append("\n Using the default output Coder from the producing PTransform failed: ")
- .append(inputCoderException.getMessage());
- }
-
- // Build and throw the exception.
- throw new IllegalStateException(messageBuilder.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/package-info.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/package-info.java
deleted file mode 100644
index b8ca756..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/package-info.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * Defines {@link com.google.cloud.dataflow.sdk.values.PCollection} and other classes for
- * representing data in a {@link com.google.cloud.dataflow.sdk.Pipeline}.
- *
- * <p>In particular, see these collection abstractions:
- *
- * <ul>
- * <li>{@link com.google.cloud.dataflow.sdk.values.PCollection} - an immutable collection of
- * values of type {@code T} and the main representation for data in Dataflow.</li>
- * <li>{@link com.google.cloud.dataflow.sdk.values.PCollectionView} - an immutable view of a
- * {@link com.google.cloud.dataflow.sdk.values.PCollection} that can be accessed as a
- * side input of a {@link com.google.cloud.dataflow.sdk.transforms.ParDo}
- * {@link com.google.cloud.dataflow.sdk.transforms.PTransform}.</li>
- * <li>{@link com.google.cloud.dataflow.sdk.values.PCollectionTuple} - a heterogeneous tuple of
- * {@link com.google.cloud.dataflow.sdk.values.PCollection PCollections}
- * used in cases where a {@link com.google.cloud.dataflow.sdk.transforms.PTransform} takes
- * or returns multiple
- * {@link com.google.cloud.dataflow.sdk.values.PCollection PCollections}.</li>
- * <li>{@link com.google.cloud.dataflow.sdk.values.PCollectionList} - a homogeneous list of
- * {@link com.google.cloud.dataflow.sdk.values.PCollection PCollections} used, for example,
- * as input to {@link com.google.cloud.dataflow.sdk.transforms.Flatten}.</li>
- * </ul>
- *
- * <p>And these classes for individual values play particular roles in Dataflow:
- *
- * <ul>
- * <li>{@link com.google.cloud.dataflow.sdk.values.KV} - a key/value pair that is used by
- * keyed transforms, most notably {@link com.google.cloud.dataflow.sdk.transforms.GroupByKey}.
- * </li>
- * <li>{@link com.google.cloud.dataflow.sdk.values.TimestampedValue} - a timestamp/value pair
- * that is used for windowing and handling out-of-order data in streaming execution.</li>
- * </ul>
- *
- * <p>For further details, see the documentation for each class in this package.
- */
-package com.google.cloud.dataflow.sdk.values;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/proto/README.md
----------------------------------------------------------------------
diff --git a/sdk/src/main/proto/README.md b/sdk/src/main/proto/README.md
deleted file mode 100644
index fa4e925..0000000
--- a/sdk/src/main/proto/README.md
+++ /dev/null
@@ -1,27 +0,0 @@
-## Protocol Buffers in Google Cloud Dataflow
-
-This directory contains the Protocol Buffer messages used in Google Cloud
-Dataflow.
-
-They aren't, however, used during the Maven build process, and are included here
-for completeness only. Instead, the following artifact on Maven Central contains
-the binary version of the generated code from these Protocol Buffers:
-
- <dependency>
- <groupId>com.google.cloud.dataflow</groupId>
- <artifactId>google-cloud-dataflow-java-proto-library-all</artifactId>
- <version>LATEST</version>
- </dependency>
-
-Please follow this process for testing changes:
-
-* Make changes to the Protocol Buffer messages in this directory.
-* Use `protoc` to generate the new code, and compile it into a new Java library.
-* Install that Java library into your local Maven repository.
-* Update SDK's `pom.xml` to pick up the newly installed library, instead of
-downloading it from Maven Central.
-
-Once the changes are ready for submission, please separate them into two
-commits. The first commit should update the Protocol Buffer messages only. After
-that, we need to update the generated artifact on Maven Central. Finally,
-changes that make use of the Protocol Buffer changes may be committed.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/proto/proto2_coder_test_messages.proto
----------------------------------------------------------------------
diff --git a/sdk/src/main/proto/proto2_coder_test_messages.proto b/sdk/src/main/proto/proto2_coder_test_messages.proto
deleted file mode 100644
index eb3c3df..0000000
--- a/sdk/src/main/proto/proto2_coder_test_messages.proto
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/*
- * Protocol Buffer messages used for testing Proto2Coder implementation.
- */
-
-syntax = "proto2";
-
-package proto2_coder_test_messages;
-
-option java_package = "com.google.cloud.dataflow.sdk.coders";
-
-message MessageA {
- optional string field1 = 1;
- repeated MessageB field2 = 2;
-}
-
-message MessageB {
- optional bool field1 = 1;
-}
-
-message MessageC {
- extensions 100 to 105;
-}
-
-extend MessageC {
- optional MessageA field1 = 101;
- optional MessageB field2 = 102;
-}
-
-message MessageWithMap {
- map<string, MessageA> field1 = 1;
-}
-
-message ReferencesMessageWithMap {
- repeated MessageWithMap field1 = 1;
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/resources/com/google/cloud/dataflow/sdk/sdk.properties
----------------------------------------------------------------------
diff --git a/sdk/src/main/resources/com/google/cloud/dataflow/sdk/sdk.properties b/sdk/src/main/resources/com/google/cloud/dataflow/sdk/sdk.properties
deleted file mode 100644
index 5b0a720..0000000
--- a/sdk/src/main/resources/com/google/cloud/dataflow/sdk/sdk.properties
+++ /dev/null
@@ -1,5 +0,0 @@
-# SDK source version.
-version=${pom.version}
-
-build.date=${timestamp}
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/test/java/com/google/cloud/dataflow/sdk/DataflowMatchers.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/DataflowMatchers.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/DataflowMatchers.java
deleted file mode 100644
index ad21072..0000000
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/DataflowMatchers.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk;
-
-import com.google.protobuf.ByteString;
-
-import org.hamcrest.Description;
-import org.hamcrest.TypeSafeMatcher;
-
-import java.io.Serializable;
-
-/**
- * Matchers that are useful when writing Dataflow tests.
- */
-public class DataflowMatchers {
- /**
- * Matcher for {@link ByteString} that prints the strings in UTF8.
- */
- public static class ByteStringMatcher extends TypeSafeMatcher<ByteString>
- implements Serializable {
- private ByteString expected;
- private ByteStringMatcher(ByteString expected) {
- this.expected = expected;
- }
-
- public static ByteStringMatcher byteStringEq(ByteString expected) {
- return new ByteStringMatcher(expected);
- }
-
- @Override
- public void describeTo(Description description) {
- description
- .appendText("ByteString(")
- .appendText(expected.toStringUtf8())
- .appendText(")");
- }
-
- @Override
- public void describeMismatchSafely(ByteString actual, Description description) {
- description
- .appendText("was ByteString(")
- .appendText(actual.toStringUtf8())
- .appendText(")");
- }
-
- @Override
- protected boolean matchesSafely(ByteString actual) {
- return actual.equals(expected);
- }
- }
-}