You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/05 00:04:23 UTC
[17/19] beam git commit: Move some PCollectionView bits out of util
Move some PCollectionView bits out of util
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c83cc744
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c83cc744
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c83cc744
Branch: refs/heads/master
Commit: c83cc744a69f735ac134471705e3403b9d5edd34
Parents: b2553ca
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed May 3 20:34:54 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu May 4 16:06:56 2017 -0700
----------------------------------------------------------------------
.../apache/beam/runners/apex/ApexRunner.java | 2 +-
.../construction/PTransformMatchersTest.java | 2 +-
.../runners/direct/PCollectionViewWindow.java | 67 +++
.../beam/runners/direct/SideInputContainer.java | 1 -
.../runners/direct/SideInputContainerTest.java | 2 +-
.../direct/ViewEvaluatorFactoryTest.java | 2 +-
.../runners/direct/ViewOverrideFactoryTest.java | 2 +-
.../direct/WriteWithShardingFactoryTest.java | 2 +-
.../flink/FlinkStreamingViewOverrides.java | 2 +-
.../runners/dataflow/BatchViewOverrides.java | 2 +-
.../org/apache/beam/sdk/transforms/Combine.java | 2 +-
.../org/apache/beam/sdk/transforms/View.java | 2 +-
.../beam/sdk/util/PCollectionViewWindow.java | 67 ---
.../apache/beam/sdk/util/PCollectionViews.java | 497 -------------------
.../beam/sdk/values/PCollectionViews.java | 495 ++++++++++++++++++
.../beam/sdk/transforms/DoFnTesterTest.java | 2 +-
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 2 +-
17 files changed, 574 insertions(+), 577 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index 1c845c6..e1828c3 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -60,9 +60,9 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.View.AsIterable;
import org.apache.beam.sdk.transforms.View.AsSingleton;
-import org.apache.beam.sdk.util.PCollectionViews;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.hadoop.conf.Configuration;
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index cb28c34..6271234 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -60,13 +60,13 @@ import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.PCollectionViews;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PCollectionViewWindow.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PCollectionViewWindow.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PCollectionViewWindow.java
new file mode 100644
index 0000000..7a7d8ff
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PCollectionViewWindow.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import java.util.Objects;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * A pair of a {@link PCollectionView} and a {@link BoundedWindow}, which can
+ * be thought of as window "of" the view. This is a value class for use e.g.
+ * as a compound cache key.
+ *
+ * @param <T> the type of the underlying PCollectionView
+ */
+public final class PCollectionViewWindow<T> {
+
+ private final PCollectionView<T> view;
+ private final BoundedWindow window;
+
+ private PCollectionViewWindow(PCollectionView<T> view, BoundedWindow window) {
+ this.view = view;
+ this.window = window;
+ }
+
+ public static <T> PCollectionViewWindow<T> of(PCollectionView<T> view, BoundedWindow window) {
+ return new PCollectionViewWindow<>(view, window);
+ }
+
+ public PCollectionView<T> getView() {
+ return view;
+ }
+
+ public BoundedWindow getWindow() {
+ return window;
+ }
+
+ @Override
+ public boolean equals(Object otherObject) {
+ if (!(otherObject instanceof PCollectionViewWindow)) {
+ return false;
+ }
+ @SuppressWarnings("unchecked")
+ PCollectionViewWindow<T> other = (PCollectionViewWindow<T>) otherObject;
+ return getView().equals(other.getView()) && getWindow().equals(other.getWindow());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getView(), getWindow());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
index 380dc65..43da92f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
@@ -39,7 +39,6 @@ import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.PCollectionViewWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.WindowingStrategy;
http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
index d4ca9fd..5e7c799 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
@@ -45,11 +45,11 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
-import org.apache.beam.sdk.util.PCollectionViews;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Instant;
http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
index 9560e94..d8869b2 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
@@ -34,10 +34,10 @@ import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.util.PCollectionViews;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
index a36787a..eda00a7 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
@@ -39,9 +39,9 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
-import org.apache.beam.sdk.util.PCollectionViews;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.hamcrest.Matchers;
import org.junit.Rule;
http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 6fffd1a..a2b0c5c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -54,9 +54,9 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.util.PCollectionViews;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
index f955f2a..ce1c895 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
@@ -30,10 +30,10 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.util.PCollectionViews;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
/**
* Flink streaming overrides for various view (side input) transforms.
http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
index 1ff8a3f..debaf59 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
@@ -74,7 +74,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.PCollectionViews;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
@@ -84,6 +83,7 @@ import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.WindowingStrategy;
http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 1be948f..666db3b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -57,13 +57,13 @@ import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.AppliedCombineFn;
import org.apache.beam.sdk.util.NameUtils;
import org.apache.beam.sdk.util.NameUtils.NameOverride;
-import org.apache.beam.sdk.util.PCollectionViews;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index d17d423..d7b8145 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -25,10 +25,10 @@ import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.PCollectionViews;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
/**
* Transforms for creating {@link PCollectionView PCollectionViews} from
http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViewWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViewWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViewWindow.java
deleted file mode 100644
index 410c8ce..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViewWindow.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import java.util.Objects;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.PCollectionView;
-
-/**
- * A pair of a {@link PCollectionView} and a {@link BoundedWindow}, which can
- * be thought of as window "of" the view. This is a value class for use e.g.
- * as a compound cache key.
- *
- * @param <T> the type of the underlying PCollectionView
- */
-public final class PCollectionViewWindow<T> {
-
- private final PCollectionView<T> view;
- private final BoundedWindow window;
-
- private PCollectionViewWindow(PCollectionView<T> view, BoundedWindow window) {
- this.view = view;
- this.window = window;
- }
-
- public static <T> PCollectionViewWindow<T> of(PCollectionView<T> view, BoundedWindow window) {
- return new PCollectionViewWindow<>(view, window);
- }
-
- public PCollectionView<T> getView() {
- return view;
- }
-
- public BoundedWindow getWindow() {
- return window;
- }
-
- @Override
- public boolean equals(Object otherObject) {
- if (!(otherObject instanceof PCollectionViewWindow)) {
- return false;
- }
- @SuppressWarnings("unchecked")
- PCollectionViewWindow<T> other = (PCollectionViewWindow<T>) otherObject;
- return getView().equals(other.getView()) && getWindow().equals(other.getWindow());
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(getView(), getWindow());
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java
deleted file mode 100644
index a07bc5e..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java
+++ /dev/null
@@ -1,497 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.common.base.Function;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Objects;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.transforms.Materialization;
-import org.apache.beam.sdk.transforms.Materializations;
-import org.apache.beam.sdk.transforms.ViewFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
-import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValueBase;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-
-/**
- * Implementations of {@link PCollectionView} shared across the SDK.
- *
- * <p>For internal use only, subject to change.
- */
-public class PCollectionViews {
-
- /**
- * Returns a {@code PCollectionView<T>} capable of processing elements encoded using the provided
- * {@link Coder} and windowed using the provided * {@link WindowingStrategy}.
- *
- * <p>If {@code hasDefault} is {@code true}, then the view will take on the value
- * {@code defaultValue} for any empty windows.
- */
- public static <T, W extends BoundedWindow> PCollectionView<T> singletonView(
- PCollection<T> pCollection,
- WindowingStrategy<?, W> windowingStrategy,
- boolean hasDefault,
- @Nullable T defaultValue,
- Coder<T> valueCoder) {
- return new SimplePCollectionView<>(
- pCollection,
- new SingletonViewFn<>(hasDefault, defaultValue, valueCoder),
- windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
- windowingStrategy,
- valueCoder);
- }
-
- /**
- * Returns a {@code PCollectionView<Iterable<T>>} capable of processing elements encoded using the
- * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
- */
- public static <T, W extends BoundedWindow> PCollectionView<Iterable<T>> iterableView(
- PCollection<T> pCollection,
- WindowingStrategy<?, W> windowingStrategy,
- Coder<T> valueCoder) {
- return new SimplePCollectionView<>(
- pCollection,
- new IterableViewFn<T>(),
- windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
- windowingStrategy,
- valueCoder);
- }
-
- /**
- * Returns a {@code PCollectionView<List<T>>} capable of processing elements encoded using the
- * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
- */
- public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView(
- PCollection<T> pCollection,
- WindowingStrategy<?, W> windowingStrategy,
- Coder<T> valueCoder) {
- return new SimplePCollectionView<>(
- pCollection,
- new ListViewFn<T>(),
- windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
- windowingStrategy,
- valueCoder);
- }
-
- /**
- * Returns a {@code PCollectionView<Map<K, V>>} capable of processing elements encoded using the
- * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
- */
- public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, V>> mapView(
- PCollection<KV<K, V>> pCollection,
- WindowingStrategy<?, W> windowingStrategy,
- Coder<KV<K, V>> valueCoder) {
- return new SimplePCollectionView<>(
- pCollection,
- new MapViewFn<K, V>(),
- windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
- windowingStrategy,
- valueCoder);
- }
-
- /**
- * Returns a {@code PCollectionView<Map<K, Iterable<V>>>} capable of processing elements encoded
- * using the provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
- */
- public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, Iterable<V>>> multimapView(
- PCollection<KV<K, V>> pCollection,
- WindowingStrategy<?, W> windowingStrategy,
- Coder<KV<K, V>> valueCoder) {
- return new SimplePCollectionView<>(
- pCollection,
- new MultimapViewFn<K, V>(),
- windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
- windowingStrategy,
- valueCoder);
- }
-
- /**
- * Implementation of conversion of singleton {@code Iterable<WindowedValue<T>>} to {@code T}.
- *
- * <p>For internal use only.
- *
- * <p>Instantiate via {@link PCollectionViews#singletonView}.
- *
- * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive
- * view type.
- */
- @Deprecated
- @Experimental(Kind.CORE_RUNNERS_ONLY)
- public static class SingletonViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>, T> {
- @Nullable private byte[] encodedDefaultValue;
- @Nullable private transient T defaultValue;
- @Nullable private Coder<T> valueCoder;
- private boolean hasDefault;
-
- private SingletonViewFn(boolean hasDefault, T defaultValue, Coder<T> valueCoder) {
- this.hasDefault = hasDefault;
- this.defaultValue = defaultValue;
- this.valueCoder = valueCoder;
- if (hasDefault) {
- try {
- this.encodedDefaultValue = CoderUtils.encodeToByteArray(valueCoder, defaultValue);
- } catch (IOException e) {
- throw new RuntimeException("Unexpected IOException: ", e);
- }
- }
- }
-
- /**
- * Returns the default value that was specified.
- *
- * <p>For internal use only.
- *
- * @throws NoSuchElementException if no default was specified.
- */
- public T getDefaultValue() {
- if (!hasDefault) {
- throw new NoSuchElementException("Empty PCollection accessed as a singleton view.");
- }
- // Lazily decode the default value once
- synchronized (this) {
- if (encodedDefaultValue != null && defaultValue == null) {
- try {
- defaultValue = CoderUtils.decodeFromByteArray(valueCoder, encodedDefaultValue);
- } catch (IOException e) {
- throw new RuntimeException("Unexpected IOException: ", e);
- }
- }
- return defaultValue;
- }
- }
-
- @Override
- public Materialization<Iterable<WindowedValue<T>>> getMaterialization() {
- return Materializations.iterable();
- }
-
- @Override
- public T apply(Iterable<WindowedValue<T>> contents) {
- try {
- return Iterables.getOnlyElement(contents).getValue();
- } catch (NoSuchElementException exc) {
- return getDefaultValue();
- } catch (IllegalArgumentException exc) {
- throw new IllegalArgumentException(
- "PCollection with more than one element "
- + "accessed as a singleton view.");
- }
- }
- }
-
- /**
- * Implementation of conversion {@code Iterable<WindowedValue<T>>} to {@code Iterable<T>}.
- *
- * <p>For internal use only.
- *
- * <p>Instantiate via {@link PCollectionViews#iterableView}.
- *
- * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive
- * view type.
- */
- @Deprecated
- @Experimental(Kind.CORE_RUNNERS_ONLY)
- public static class IterableViewFn<T>
- extends ViewFn<Iterable<WindowedValue<T>>, Iterable<T>> {
- @Override
- public Materialization<Iterable<WindowedValue<T>>> getMaterialization() {
- return Materializations.iterable();
- }
-
- @Override
- public Iterable<T> apply(Iterable<WindowedValue<T>> contents) {
- return Iterables.unmodifiableIterable(
- Iterables.transform(contents, new Function<WindowedValue<T>, T>() {
- @SuppressWarnings("unchecked")
- @Override
- public T apply(WindowedValue<T> input) {
- return input.getValue();
- }
- }));
- }
- }
-
- /**
- * Implementation of conversion {@code Iterable<WindowedValue<T>>} to {@code List<T>}.
- *
- * <p>For internal use only.
- *
- * <p>Instantiate via {@link PCollectionViews#listView}.
- *
- * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive
- * view type.
- */
- @Deprecated
- @Experimental(Kind.CORE_RUNNERS_ONLY)
- public static class ListViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>, List<T>> {
- @Override
- public Materialization<Iterable<WindowedValue<T>>> getMaterialization() {
- return Materializations.iterable();
- }
-
- @Override
- public List<T> apply(Iterable<WindowedValue<T>> contents) {
- return ImmutableList.copyOf(
- Iterables.transform(contents, new Function<WindowedValue<T>, T>() {
- @SuppressWarnings("unchecked")
- @Override
- public T apply(WindowedValue<T> input) {
- return input.getValue();
- }
- }));
- }
- }
-
- /**
- * Implementation of conversion {@code Iterable<WindowedValue<KV<K, V>>>}
- * to {@code Map<K, Iterable<V>>}.
- *
- * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive
- * view type.
- */
- @Deprecated
- @Experimental(Kind.CORE_RUNNERS_ONLY)
- public static class MultimapViewFn<K, V>
- extends ViewFn<Iterable<WindowedValue<KV<K, V>>>, Map<K, Iterable<V>>> {
- @Override
- public Materialization<Iterable<WindowedValue<KV<K, V>>>> getMaterialization() {
- return Materializations.iterable();
- }
-
- @Override
- public Map<K, Iterable<V>> apply(Iterable<WindowedValue<KV<K, V>>> elements) {
- Multimap<K, V> multimap = HashMultimap.create();
- for (WindowedValue<KV<K, V>> elem : elements) {
- KV<K, V> kv = elem.getValue();
- multimap.put(kv.getKey(), kv.getValue());
- }
- // Safe covariant cast that Java cannot express without rawtypes, even with unchecked casts
- @SuppressWarnings({"unchecked", "rawtypes"})
- Map<K, Iterable<V>> resultMap = (Map) multimap.asMap();
- return Collections.unmodifiableMap(resultMap);
- }
- }
-
- /**
- * Implementation of conversion {@code Iterable<WindowedValue<KV<K, V>>} with one value per key to
- * {@code Map<K, V>}.
- *
- * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive
- * view type.
- */
- @Deprecated
- @Experimental(Kind.CORE_RUNNERS_ONLY)
- public static class MapViewFn<K, V> extends ViewFn<Iterable<WindowedValue<KV<K, V>>>, Map<K, V>> {
- @Override
- public Materialization<Iterable<WindowedValue<KV<K, V>>>> getMaterialization() {
- return Materializations.iterable();
- }
-
- /**
- * Input iterable must actually be {@code Iterable<WindowedValue<KV<K, V>>>}.
- */
- @Override
- public Map<K, V> apply(Iterable<WindowedValue<KV<K, V>>> elements) {
- Map<K, V> map = new HashMap<>();
- for (WindowedValue<KV<K, V>> elem : elements) {
- KV<K, V> kv = elem.getValue();
- if (map.containsKey(kv.getKey())) {
- throw new IllegalArgumentException("Duplicate values for " + kv.getKey());
- }
- map.put(kv.getKey(), kv.getValue());
- }
- return Collections.unmodifiableMap(map);
- }
- }
-
- /**
- * A class for {@link PCollectionView} implementations, with additional type parameters
- * that are not visible at pipeline assembly time when the view is used as a side input.
- *
- * <p>For internal use only.
- */
- public static class SimplePCollectionView<ElemT, ViewT, W extends BoundedWindow>
- extends PValueBase
- implements PCollectionView<ViewT> {
- /** The {@link PCollection} this view was originally created from. */
- private transient PCollection<ElemT> pCollection;
-
- /** A unique tag for the view, typed according to the elements underlying the view. */
- private TupleTag<Iterable<WindowedValue<ElemT>>> tag;
-
- private WindowMappingFn<W> windowMappingFn;
-
- /** The windowing strategy for the PCollection underlying the view. */
- private WindowingStrategy<?, W> windowingStrategy;
-
- /** The coder for the elements underlying the view. */
- private Coder<Iterable<WindowedValue<ElemT>>> coder;
-
- /**
- * The typed {@link ViewFn} for this view.
- */
- private ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn;
-
- /**
- * Call this constructor to initialize the fields for which this base class provides
- * boilerplate accessors.
- */
- private SimplePCollectionView(
- PCollection<ElemT> pCollection,
- TupleTag<Iterable<WindowedValue<ElemT>>> tag,
- ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
- WindowMappingFn<W> windowMappingFn,
- WindowingStrategy<?, W> windowingStrategy,
- Coder<ElemT> valueCoder) {
- super(pCollection.getPipeline());
- this.pCollection = pCollection;
- if (windowingStrategy.getWindowFn() instanceof InvalidWindows) {
- throw new IllegalArgumentException("WindowFn of PCollectionView cannot be InvalidWindows");
- }
- this.windowMappingFn = windowMappingFn;
- this.tag = tag;
- this.windowingStrategy = windowingStrategy;
- this.viewFn = viewFn;
- this.coder =
- IterableCoder.of(WindowedValue.getFullCoder(
- valueCoder, windowingStrategy.getWindowFn().windowCoder()));
- }
-
- /**
- * Call this constructor to initialize the fields for which this base class provides
- * boilerplate accessors, with an auto-generated tag.
- */
- private SimplePCollectionView(
- PCollection<ElemT> pCollection,
- ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
- WindowMappingFn<W> windowMappingFn,
- WindowingStrategy<?, W> windowingStrategy,
- Coder<ElemT> valueCoder) {
- this(
- pCollection,
- new TupleTag<Iterable<WindowedValue<ElemT>>>(),
- viewFn,
- windowMappingFn,
- windowingStrategy,
- valueCoder);
- }
-
- /**
- * For serialization only. Do not use directly.
- */
- @SuppressWarnings("unused") // used for serialization
- protected SimplePCollectionView() {
- super();
- }
-
- @Override
- public ViewFn<Iterable<WindowedValue<?>>, ViewT> getViewFn() {
- // Safe cast: it is required that the rest of the SDK maintain the invariant
- // that a PCollectionView is only provided an iterable for the elements of an
- // appropriately typed PCollection.
- @SuppressWarnings({"rawtypes", "unchecked"})
- ViewFn<Iterable<WindowedValue<?>>, ViewT> untypedViewFn = (ViewFn) viewFn;
- return untypedViewFn;
- }
-
- @Override
- public WindowMappingFn<?> getWindowMappingFn() {
- return windowMappingFn;
- }
-
- @Override
- public PCollection<?> getPCollection() {
- return pCollection;
- }
-
- /**
- * Returns a unique {@link TupleTag} identifying this {@link PCollectionView}.
- *
- * <p>For internal use only by runner implementors.
- */
- @Override
- public TupleTag<Iterable<WindowedValue<?>>> getTagInternal() {
- // Safe cast: It is required that the rest of the SDK maintain the invariant that
- // this tag is only used to access the contents of an appropriately typed underlying
- // PCollection
- @SuppressWarnings({"rawtypes", "unchecked"})
- TupleTag<Iterable<WindowedValue<?>>> untypedTag = (TupleTag) tag;
- return untypedTag;
- }
-
- /**
- * Returns the {@link WindowingStrategy} of this {@link PCollectionView}, which should
- * be that of the underlying {@link PCollection}.
- *
- * <p>For internal use only by runner implementors.
- */
- @Override
- public WindowingStrategy<?, ?> getWindowingStrategyInternal() {
- return windowingStrategy;
- }
-
- @Override
- public Coder<Iterable<WindowedValue<?>>> getCoderInternal() {
- // Safe cast: It is required that the rest of the SDK only use this untyped coder
- // for the elements of an appropriately typed underlying PCollection.
- @SuppressWarnings({"rawtypes", "unchecked"})
- Coder<Iterable<WindowedValue<?>>> untypedCoder = (Coder) coder;
- return untypedCoder;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(tag);
- }
-
- @Override
- public boolean equals(Object other) {
- if (!(other instanceof PCollectionView)) {
- return false;
- }
- @SuppressWarnings("unchecked")
- PCollectionView<?> otherView = (PCollectionView<?>) other;
- return tag.equals(otherView.getTagInternal());
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this).add("tag", tag).toString();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
new file mode 100644
index 0000000..74887c7
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.values;
+
+import com.google.common.base.Function;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.transforms.Materialization;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.ViewFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * <b>For internal use only; no backwards compatibility guarantees.</b>
+ *
+ * <p>Implementations of {@link PCollectionView} shared across the SDK.
+ */
+@Internal
+public class PCollectionViews {
+
+ /**
+ * Returns a {@code PCollectionView<T>} capable of processing elements encoded using the provided
+ * {@link Coder} and windowed using the provided * {@link WindowingStrategy}.
+ *
+ * <p>If {@code hasDefault} is {@code true}, then the view will take on the value
+ * {@code defaultValue} for any empty windows.
+ */
+ public static <T, W extends BoundedWindow> PCollectionView<T> singletonView(
+ PCollection<T> pCollection,
+ WindowingStrategy<?, W> windowingStrategy,
+ boolean hasDefault,
+ @Nullable T defaultValue,
+ Coder<T> valueCoder) {
+ return new SimplePCollectionView<>(
+ pCollection,
+ new SingletonViewFn<>(hasDefault, defaultValue, valueCoder),
+ windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
+ windowingStrategy,
+ valueCoder);
+ }
+
+ /**
+ * Returns a {@code PCollectionView<Iterable<T>>} capable of processing elements encoded using the
+ * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
+ */
+ public static <T, W extends BoundedWindow> PCollectionView<Iterable<T>> iterableView(
+ PCollection<T> pCollection,
+ WindowingStrategy<?, W> windowingStrategy,
+ Coder<T> valueCoder) {
+ return new SimplePCollectionView<>(
+ pCollection,
+ new IterableViewFn<T>(),
+ windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
+ windowingStrategy,
+ valueCoder);
+ }
+
+ /**
+ * Returns a {@code PCollectionView<List<T>>} capable of processing elements encoded using the
+ * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
+ */
+ public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView(
+ PCollection<T> pCollection,
+ WindowingStrategy<?, W> windowingStrategy,
+ Coder<T> valueCoder) {
+ return new SimplePCollectionView<>(
+ pCollection,
+ new ListViewFn<T>(),
+ windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
+ windowingStrategy,
+ valueCoder);
+ }
+
+ /**
+ * Returns a {@code PCollectionView<Map<K, V>>} capable of processing elements encoded using the
+ * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
+ */
+ public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, V>> mapView(
+ PCollection<KV<K, V>> pCollection,
+ WindowingStrategy<?, W> windowingStrategy,
+ Coder<KV<K, V>> valueCoder) {
+ return new SimplePCollectionView<>(
+ pCollection,
+ new MapViewFn<K, V>(),
+ windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
+ windowingStrategy,
+ valueCoder);
+ }
+
+ /**
+ * Returns a {@code PCollectionView<Map<K, Iterable<V>>>} capable of processing elements encoded
+ * using the provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
+ */
+ public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, Iterable<V>>> multimapView(
+ PCollection<KV<K, V>> pCollection,
+ WindowingStrategy<?, W> windowingStrategy,
+ Coder<KV<K, V>> valueCoder) {
+ return new SimplePCollectionView<>(
+ pCollection,
+ new MultimapViewFn<K, V>(),
+ windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
+ windowingStrategy,
+ valueCoder);
+ }
+
+ /**
+ * Implementation of conversion of singleton {@code Iterable<WindowedValue<T>>} to {@code T}.
+ *
+ * <p>For internal use only.
+ *
+ * <p>Instantiate via {@link PCollectionViews#singletonView}.
+ *
+ * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive
+ * view type.
+ */
+ @Deprecated
+ @Experimental(Kind.CORE_RUNNERS_ONLY)
+ public static class SingletonViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>, T> {
+ @Nullable private byte[] encodedDefaultValue;
+ @Nullable private transient T defaultValue;
+ @Nullable private Coder<T> valueCoder;
+ private boolean hasDefault;
+
+ private SingletonViewFn(boolean hasDefault, T defaultValue, Coder<T> valueCoder) {
+ this.hasDefault = hasDefault;
+ this.defaultValue = defaultValue;
+ this.valueCoder = valueCoder;
+ if (hasDefault) {
+ try {
+ this.encodedDefaultValue = CoderUtils.encodeToByteArray(valueCoder, defaultValue);
+ } catch (IOException e) {
+ throw new RuntimeException("Unexpected IOException: ", e);
+ }
+ }
+ }
+
+ /**
+ * Returns the default value that was specified.
+ *
+ * <p>For internal use only.
+ *
+ * @throws NoSuchElementException if no default was specified.
+ */
+ public T getDefaultValue() {
+ if (!hasDefault) {
+ throw new NoSuchElementException("Empty PCollection accessed as a singleton view.");
+ }
+ // Lazily decode the default value once
+ synchronized (this) {
+ if (encodedDefaultValue != null && defaultValue == null) {
+ try {
+ defaultValue = CoderUtils.decodeFromByteArray(valueCoder, encodedDefaultValue);
+ } catch (IOException e) {
+ throw new RuntimeException("Unexpected IOException: ", e);
+ }
+ }
+ return defaultValue;
+ }
+ }
+
+ @Override
+ public Materialization<Iterable<WindowedValue<T>>> getMaterialization() {
+ return Materializations.iterable();
+ }
+
+ @Override
+ public T apply(Iterable<WindowedValue<T>> contents) {
+ try {
+ return Iterables.getOnlyElement(contents).getValue();
+ } catch (NoSuchElementException exc) {
+ return getDefaultValue();
+ } catch (IllegalArgumentException exc) {
+ throw new IllegalArgumentException(
+ "PCollection with more than one element "
+ + "accessed as a singleton view.");
+ }
+ }
+ }
+
+ /**
+ * Implementation of conversion {@code Iterable<WindowedValue<T>>} to {@code Iterable<T>}.
+ *
+ * <p>For internal use only.
+ *
+ * <p>Instantiate via {@link PCollectionViews#iterableView}.
+ *
+ * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive
+ * view type.
+ */
+ @Deprecated
+ @Experimental(Kind.CORE_RUNNERS_ONLY)
+ public static class IterableViewFn<T>
+ extends ViewFn<Iterable<WindowedValue<T>>, Iterable<T>> {
+ @Override
+ public Materialization<Iterable<WindowedValue<T>>> getMaterialization() {
+ return Materializations.iterable();
+ }
+
+ @Override
+ public Iterable<T> apply(Iterable<WindowedValue<T>> contents) {
+ return Iterables.unmodifiableIterable(
+ Iterables.transform(contents, new Function<WindowedValue<T>, T>() {
+ @SuppressWarnings("unchecked")
+ @Override
+ public T apply(WindowedValue<T> input) {
+ return input.getValue();
+ }
+ }));
+ }
+ }
+
+ /**
+ * Implementation of conversion {@code Iterable<WindowedValue<T>>} to {@code List<T>}.
+ *
+ * <p>For internal use only.
+ *
+ * <p>Instantiate via {@link PCollectionViews#listView}.
+ *
+ * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive
+ * view type.
+ */
+ @Deprecated
+ @Experimental(Kind.CORE_RUNNERS_ONLY)
+ public static class ListViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>, List<T>> {
+ @Override
+ public Materialization<Iterable<WindowedValue<T>>> getMaterialization() {
+ return Materializations.iterable();
+ }
+
+ @Override
+ public List<T> apply(Iterable<WindowedValue<T>> contents) {
+ return ImmutableList.copyOf(
+ Iterables.transform(contents, new Function<WindowedValue<T>, T>() {
+ @SuppressWarnings("unchecked")
+ @Override
+ public T apply(WindowedValue<T> input) {
+ return input.getValue();
+ }
+ }));
+ }
+ }
+
+ /**
+ * Implementation of conversion {@code Iterable<WindowedValue<KV<K, V>>>}
+ * to {@code Map<K, Iterable<V>>}.
+ *
+ * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive
+ * view type.
+ */
+ @Deprecated
+ @Experimental(Kind.CORE_RUNNERS_ONLY)
+ public static class MultimapViewFn<K, V>
+ extends ViewFn<Iterable<WindowedValue<KV<K, V>>>, Map<K, Iterable<V>>> {
+ @Override
+ public Materialization<Iterable<WindowedValue<KV<K, V>>>> getMaterialization() {
+ return Materializations.iterable();
+ }
+
+ @Override
+ public Map<K, Iterable<V>> apply(Iterable<WindowedValue<KV<K, V>>> elements) {
+ Multimap<K, V> multimap = HashMultimap.create();
+ for (WindowedValue<KV<K, V>> elem : elements) {
+ KV<K, V> kv = elem.getValue();
+ multimap.put(kv.getKey(), kv.getValue());
+ }
+ // Safe covariant cast that Java cannot express without rawtypes, even with unchecked casts
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Map<K, Iterable<V>> resultMap = (Map) multimap.asMap();
+ return Collections.unmodifiableMap(resultMap);
+ }
+ }
+
+ /**
+ * Implementation of conversion {@code Iterable<WindowedValue<KV<K, V>>} with one value per key to
+ * {@code Map<K, V>}.
+ *
+ * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive
+ * view type.
+ */
+ @Deprecated
+ @Experimental(Kind.CORE_RUNNERS_ONLY)
+ public static class MapViewFn<K, V> extends ViewFn<Iterable<WindowedValue<KV<K, V>>>, Map<K, V>> {
+ @Override
+ public Materialization<Iterable<WindowedValue<KV<K, V>>>> getMaterialization() {
+ return Materializations.iterable();
+ }
+
+ /**
+ * Input iterable must actually be {@code Iterable<WindowedValue<KV<K, V>>>}.
+ */
+ @Override
+ public Map<K, V> apply(Iterable<WindowedValue<KV<K, V>>> elements) {
+ Map<K, V> map = new HashMap<>();
+ for (WindowedValue<KV<K, V>> elem : elements) {
+ KV<K, V> kv = elem.getValue();
+ if (map.containsKey(kv.getKey())) {
+ throw new IllegalArgumentException("Duplicate values for " + kv.getKey());
+ }
+ map.put(kv.getKey(), kv.getValue());
+ }
+ return Collections.unmodifiableMap(map);
+ }
+ }
+
+ /**
+ * A class for {@link PCollectionView} implementations, with additional type parameters
+ * that are not visible at pipeline assembly time when the view is used as a side input.
+ *
+ * <p>For internal use only.
+ */
+ public static class SimplePCollectionView<ElemT, ViewT, W extends BoundedWindow>
+ extends PValueBase
+ implements PCollectionView<ViewT> {
+ /** The {@link PCollection} this view was originally created from. */
+ private transient PCollection<ElemT> pCollection;
+
+ /** A unique tag for the view, typed according to the elements underlying the view. */
+ private TupleTag<Iterable<WindowedValue<ElemT>>> tag;
+
+ private WindowMappingFn<W> windowMappingFn;
+
+ /** The windowing strategy for the PCollection underlying the view. */
+ private WindowingStrategy<?, W> windowingStrategy;
+
+ /** The coder for the elements underlying the view. */
+ private Coder<Iterable<WindowedValue<ElemT>>> coder;
+
+ /**
+ * The typed {@link ViewFn} for this view.
+ */
+ private ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn;
+
+ /**
+ * Call this constructor to initialize the fields for which this base class provides
+ * boilerplate accessors.
+ */
+ private SimplePCollectionView(
+ PCollection<ElemT> pCollection,
+ TupleTag<Iterable<WindowedValue<ElemT>>> tag,
+ ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
+ WindowMappingFn<W> windowMappingFn,
+ WindowingStrategy<?, W> windowingStrategy,
+ Coder<ElemT> valueCoder) {
+ super(pCollection.getPipeline());
+ this.pCollection = pCollection;
+ if (windowingStrategy.getWindowFn() instanceof InvalidWindows) {
+ throw new IllegalArgumentException("WindowFn of PCollectionView cannot be InvalidWindows");
+ }
+ this.windowMappingFn = windowMappingFn;
+ this.tag = tag;
+ this.windowingStrategy = windowingStrategy;
+ this.viewFn = viewFn;
+ this.coder =
+ IterableCoder.of(WindowedValue.getFullCoder(
+ valueCoder, windowingStrategy.getWindowFn().windowCoder()));
+ }
+
+ /**
+ * Call this constructor to initialize the fields for which this base class provides
+ * boilerplate accessors, with an auto-generated tag.
+ */
+ private SimplePCollectionView(
+ PCollection<ElemT> pCollection,
+ ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
+ WindowMappingFn<W> windowMappingFn,
+ WindowingStrategy<?, W> windowingStrategy,
+ Coder<ElemT> valueCoder) {
+ this(
+ pCollection,
+ new TupleTag<Iterable<WindowedValue<ElemT>>>(),
+ viewFn,
+ windowMappingFn,
+ windowingStrategy,
+ valueCoder);
+ }
+
+ /**
+ * For serialization only. Do not use directly.
+ */
+ @SuppressWarnings("unused") // used for serialization
+ protected SimplePCollectionView() {
+ super();
+ }
+
+ @Override
+ public ViewFn<Iterable<WindowedValue<?>>, ViewT> getViewFn() {
+ // Safe cast: it is required that the rest of the SDK maintain the invariant
+ // that a PCollectionView is only provided an iterable for the elements of an
+ // appropriately typed PCollection.
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ ViewFn<Iterable<WindowedValue<?>>, ViewT> untypedViewFn = (ViewFn) viewFn;
+ return untypedViewFn;
+ }
+
+ @Override
+ public WindowMappingFn<?> getWindowMappingFn() {
+ return windowMappingFn;
+ }
+
+ @Override
+ public PCollection<?> getPCollection() {
+ return pCollection;
+ }
+
+ /**
+ * Returns a unique {@link TupleTag} identifying this {@link PCollectionView}.
+ *
+ * <p>For internal use only by runner implementors.
+ */
+ @Override
+ public TupleTag<Iterable<WindowedValue<?>>> getTagInternal() {
+ // Safe cast: It is required that the rest of the SDK maintain the invariant that
+ // this tag is only used to access the contents of an appropriately typed underlying
+ // PCollection
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ TupleTag<Iterable<WindowedValue<?>>> untypedTag = (TupleTag) tag;
+ return untypedTag;
+ }
+
+ /**
+ * Returns the {@link WindowingStrategy} of this {@link PCollectionView}, which should
+ * be that of the underlying {@link PCollection}.
+ *
+ * <p>For internal use only by runner implementors.
+ */
+ @Override
+ public WindowingStrategy<?, ?> getWindowingStrategyInternal() {
+ return windowingStrategy;
+ }
+
+ @Override
+ public Coder<Iterable<WindowedValue<?>>> getCoderInternal() {
+ // Safe cast: It is required that the rest of the SDK only use this untyped coder
+ // for the elements of an appropriately typed underlying PCollection.
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ Coder<Iterable<WindowedValue<?>>> untypedCoder = (Coder) coder;
+ return untypedCoder;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(tag);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof PCollectionView)) {
+ return false;
+ }
+ @SuppressWarnings("unchecked")
+ PCollectionView<?> otherView = (PCollectionView<?>) other;
+ return tag.equals(otherView.getTagInternal());
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("tag", tag).toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
index f74d673..1bb71bb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
@@ -34,10 +34,10 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.util.PCollectionViews;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.hamcrest.Matchers;
http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 38da2d9..a3b21ee 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -121,11 +121,11 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.MimeTypes;
-import org.apache.beam.sdk.util.PCollectionViews;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.ValueInSingleWindow;