You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:39 UTC
[15/67] [partial] incubator-beam git commit: Directory reorganization
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/WindowFn.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/WindowFn.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/WindowFn.java
deleted file mode 100644
index d51fc7e..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/WindowFn.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.common.collect.Ordering;
-
-import org.joda.time.Instant;
-
-import java.io.Serializable;
-import java.util.Collection;
-
-/**
- * The argument to the {@link Window} transform used to assign elements into
- * windows and to determine how windows are merged. See {@link Window} for more
- * information on how {@code WindowFn}s are used and for a library of
- * predefined {@code WindowFn}s.
- *
- * <p>Users will generally want to use the predefined
- * {@code WindowFn}s, but it is also possible to create new
- * subclasses.
- *
- * <p>To create a custom {@code WindowFn}, inherit from this class and override all required
- * methods. If no merging is required, inherit from {@link NonMergingWindowFn}
- * instead. If no merging is required and each element is assigned to a single window, inherit from
- * {@code PartitioningWindowFn}. Inheriting from the most specific subclass will enable more
- * optimizations in the runner.
- *
- * @param <T> type of elements being windowed
- * @param <W> {@link BoundedWindow} subclass used to represent the
- * windows used by this {@code WindowFn}
- */
-public abstract class WindowFn<T, W extends BoundedWindow>
- implements Serializable {
- /**
- * Information available when running {@link #assignWindows}.
- */
- public abstract class AssignContext {
- /**
- * Returns the current element.
- */
- public abstract T element();
-
- /**
- * Returns the timestamp of the current element.
- */
- public abstract Instant timestamp();
-
- /**
- * Returns the windows the current element was in, prior to this
- * {@code WindowFn} being called.
- */
- public abstract Collection<? extends BoundedWindow> windows();
- }
-
- /**
- * Given a timestamp and element, returns the set of windows into which it
- * should be placed.
- */
- public abstract Collection<W> assignWindows(AssignContext c) throws Exception;
-
- /**
- * Information available when running {@link #mergeWindows}.
- */
- public abstract class MergeContext {
- /**
- * Returns the current set of windows.
- */
- public abstract Collection<W> windows();
-
- /**
- * Signals to the framework that the windows in {@code toBeMerged} should
- * be merged together to form {@code mergeResult}.
- *
- * <p>{@code toBeMerged} should be a subset of {@link #windows}
- * and disjoint from the {@code toBeMerged} set of previous calls
- * to {@code merge}.
- *
- * <p>{@code mergeResult} must either not be in {@link #windows} or be in
- * {@code toBeMerged}.
- *
- * @throws IllegalArgumentException if any elements of toBeMerged are not
- * in windows(), or have already been merged
- */
- public abstract void merge(Collection<W> toBeMerged, W mergeResult)
- throws Exception;
- }
-
- /**
- * Does whatever merging of windows is necessary.
- *
- * <p>See {@link MergeOverlappingIntervalWindows#mergeWindows} for an
- * example of how to override this method.
- */
- public abstract void mergeWindows(MergeContext c) throws Exception;
-
- /**
- * Returns whether this performs the same merging as the given
- * {@code WindowFn}.
- */
- public abstract boolean isCompatible(WindowFn<?, ?> other);
-
- /**
- * Returns the {@link Coder} used for serializing the windows used
- * by this windowFn.
- */
- public abstract Coder<W> windowCoder();
-
- /**
- * Returns the window of the side input corresponding to the given window of
- * the main input.
- *
- * <p>Authors of custom {@code WindowFn}s should override this.
- */
- public abstract W getSideInputWindow(final BoundedWindow window);
-
- /**
- * @deprecated Implement {@link #getOutputTimeFn} to return one of the appropriate
- * {@link OutputTimeFns}, or a custom {@link OutputTimeFn} extending
- * {@link OutputTimeFn.Defaults}.
- */
- @Deprecated
- @Experimental(Kind.OUTPUT_TIME)
- public Instant getOutputTime(Instant inputTimestamp, W window) {
- return getOutputTimeFn().assignOutputTime(inputTimestamp, window);
- }
-
- /**
- * Provides a default implementation for {@link WindowingStrategy#getOutputTimeFn()}.
- * See the full specification there.
- *
- * <p>If this {@link WindowFn} doesn't produce overlapping windows, this need not (and probably
- * should not) override any of the default implementations in {@link OutputTimeFn.Defaults}.
- *
- * <p>If this {@link WindowFn} does produce overlapping windows that can be predicted here, it is
- * suggested that the result in later overlapping windows is past the end of earlier windows so
- * that the later windows don't prevent the watermark from progressing past the end of the earlier
- * window.
- *
- * <p>For example, a timestamp in a sliding window should be moved past the beginning of the next
- * sliding window. See {@link SlidingWindows#getOutputTimeFn}.
- */
- @Experimental(Kind.OUTPUT_TIME)
- public OutputTimeFn<? super W> getOutputTimeFn() {
- return new OutputAtEarliestAssignedTimestamp<>(this);
- }
-
- /**
- * Returns true if this {@code WindowFn} never needs to merge any windows.
- */
- public boolean isNonMerging() {
- return false;
- }
-
- /**
- * Returns true if this {@code WindowFn} assigns each element to a single window.
- */
- public boolean assignsToSingleWindow() {
- return false;
- }
-
- /**
- * A compatibility adapter that will return the assigned timestamps according to the
- * {@link WindowFn}, which was the prior policy. Specifying the assigned output timestamps
- * on the {@link WindowFn} is now deprecated.
- */
- private static class OutputAtEarliestAssignedTimestamp<W extends BoundedWindow>
- extends OutputTimeFn.Defaults<W> {
-
- private final WindowFn<?, W> windowFn;
-
- public OutputAtEarliestAssignedTimestamp(WindowFn<?, W> windowFn) {
- this.windowFn = windowFn;
- }
-
- /**
- * {@inheritDoc}
- *
- * @return the result of {@link WindowFn#getOutputTime windowFn.getOutputTime()}.
- */
- @Override
- @SuppressWarnings("deprecation") // this is an adapter for the deprecated behavior
- public Instant assignOutputTime(Instant timestamp, W window) {
- return windowFn.getOutputTime(timestamp, window);
- }
-
- @Override
- public Instant combine(Instant outputTime, Instant otherOutputTime) {
- return Ordering.natural().min(outputTime, otherOutputTime);
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true}. When the {@link OutputTimeFn} is not overridden by {@link WindowFn}
- * or {@link WindowingStrategy}, the minimum output timestamp is taken, which depends
- * only on the minimum input timestamp by monotonicity of {@link #assignOutputTime}.
- */
- @Override
- public boolean dependsOnlyOnEarliestInputTimestamp() {
- return true;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/package-info.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/package-info.java
deleted file mode 100644
index 65ccf71..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/package-info.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * Defines the {@link com.google.cloud.dataflow.sdk.transforms.windowing.Window} transform
- * for dividing the elements in a PCollection into windows, and the
- * {@link com.google.cloud.dataflow.sdk.transforms.windowing.Trigger} for controlling when those
- * elements are output.
- *
- * <p>{@code Window} logically divides up or groups the elements of a
- * {@link com.google.cloud.dataflow.sdk.values.PCollection} into finite windows according to a
- * {@link com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn}.
- * The output of {@code Window} contains the same elements as input, but they
- * have been logically assigned to windows. The next
- * {@link com.google.cloud.dataflow.sdk.transforms.GroupByKey}s, including one
- * within composite transforms, will group by the combination of keys and
- * windows.
- *
- * <p>Windowing a {@code PCollection} allows chunks of it to be processed
- * individually, before the entire {@code PCollection} is available. This is
- * especially important for {@code PCollection}s with unbounded size, since the full
- * {@code PCollection} is never available at once.
- *
- * <p>For {@code PCollection}s with a bounded size, by default, all data is implicitly in a
- * single window, and this replicates conventional batch mode. However, windowing can still be a
- * convenient way to express time-sliced algorithms over bounded {@code PCollection}s.
- *
- * <p>As elements are assigned to a window, they are are placed into a pane. When the trigger fires
- * all of the elements in the current pane are output.
- *
- * <p>The {@link com.google.cloud.dataflow.sdk.transforms.windowing.DefaultTrigger} will output a
- * window when the system watermark passes the end of the window. See
- * {@link com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark} for details on the
- * watermark.
- */
-package com.google.cloud.dataflow.sdk.transforms.windowing;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ActiveWindowSet.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ActiveWindowSet.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ActiveWindowSet.java
deleted file mode 100644
index 69350cb..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ActiveWindowSet.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
-
-import java.util.Collection;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-/**
- * Track which active windows have their state associated with merged-away windows.
- *
- * When windows are merged we must track which state previously associated with the merged windows
- * must now be associated with the result window. Some of that state may be combined eagerly when
- * the windows are merged. The rest is combined lazily when the final state is actually
- * required when emitting a pane. We keep track of this using an {@link ActiveWindowSet}.
- *
- * <p>An {@link ActiveWindowSet} considers a window to be in one of the following states:
- *
- * <ol>
- * <li><b>NEW</b>: The initial state for a window on an incoming element; we do not yet know
- * if it should be merged into an ACTIVE window, or whether it is already present as an
- * ACTIVE window, since we have not yet called
- * {@link WindowFn#mergeWindows}.</li>
- * <li><b>ACTIVE</b>: A window that has state associated with it and has not itself been merged
- * away. The window may have one or more <i>state address</i> windows under which its
- * non-empty state is stored. A state value for an ACTIVE window must be derived by reading
- * the state in all of its state address windows.</li>
- * <li><b>EPHEMERAL</b>: A NEW window that has been merged into an ACTIVE window before any state
- * has been associated with that window. Thus the window is neither ACTIVE nor MERGED. These
- * windows are not persistently represented since if they reappear the merge function should
- * again redirect them to an ACTIVE window. EPHEMERAL windows are an optimization for
- * the common case of in-order events and {@link Sessions session window} by never associating
- * state with windows that are created and immediately merged away.</li>
- * <li><b>MERGED</b>: An ACTIVE window has been merged into another ACTIVE window after it had
- * state associated with it. The window will thus appear as a state address window for exactly
- * one ACTIVE window.</li>
- * <li><b>EXPIRED</b>: The window has expired and may have been garbage collected. No new elements
- * (even late elements) will ever be assigned to that window. These windows are not explicitly
- * represented anywhere; it is expected that the user of {@link ActiveWindowSet} will store
- * no state associated with the window.</li>
- * </ol>
- *
- * <p>
- *
- * <p>If no windows will ever be merged we can use the trivial implementation {@link
- * NonMergingActiveWindowSet}. Otherwise, the actual implementation of this data structure is in
- * {@link MergingActiveWindowSet}.
- *
- * @param <W> the type of window being managed
- */
-public interface ActiveWindowSet<W extends BoundedWindow> {
- /**
- * Callback for {@link #merge}.
- */
- public interface MergeCallback<W extends BoundedWindow> {
- /**
- * Called when windows are about to be merged, but before any {@link #onMerge} callback
- * has been made.
- */
- void prefetchOnMerge(Collection<W> toBeMerged, Collection<W> activeToBeMerged, W mergeResult)
- throws Exception;
-
- /**
- * Called when windows are about to be merged, after all {@link #prefetchOnMerge} calls
- * have been made, but before the active window set has been updated to reflect the merge.
- *
- * @param toBeMerged the windows about to be merged.
- * @param activeToBeMerged the subset of {@code toBeMerged} corresponding to windows which
- * are currently ACTIVE (and about to be merged). The remaining windows have been deemed
- * EPHEMERAL, and thus have no state associated with them.
- * @param mergeResult the result window, either a member of {@code toBeMerged} or new.
- */
- void onMerge(Collection<W> toBeMerged, Collection<W> activeToBeMerged, W mergeResult)
- throws Exception;
- }
-
- /**
- * Remove EPHEMERAL windows since we only need to know about them while processing new elements.
- */
- void removeEphemeralWindows();
-
- /**
- * Save any state changes needed.
- */
- void persist();
-
- /**
- * Return the ACTIVE window into which {@code window} has been merged.
- * Return {@code window} itself if it is ACTIVE. Return null if {@code window} has not
- * yet been seen.
- */
- @Nullable
- W representative(W window);
-
- /**
- * Return (a view of) the set of currently ACTIVE windows.
- */
- Set<W> getActiveWindows();
-
- /**
- * Return {@code true} if {@code window} is ACTIVE.
- */
- boolean isActive(W window);
-
- /**
- * If {@code window} is not already known to be ACTIVE, MERGED or EPHEMERAL then add it
- * as NEW. All NEW windows will be accounted for as ACTIVE, MERGED or EPHEMERAL by a call
- * to {@link #merge}.
- */
- void addNew(W window);
-
- /**
- * If {@code window} is not already known to be ACTIVE, MERGED or EPHEMERAL then add it
- * as ACTIVE.
- */
- void addActive(W window);
-
- /**
- * Remove {@code window} from the set.
- */
- void remove(W window);
-
- /**
- * Invoke {@link WindowFn#mergeWindows} on the {@code WindowFn} associated with this window set,
- * merging as many of the active windows as possible. {@code mergeCallback} will be invoked for
- * each group of windows that are merged. After this no NEW windows will remain, all merge
- * result windows will be ACTIVE, and all windows which have been merged away will not be ACTIVE.
- */
- void merge(MergeCallback<W> mergeCallback) throws Exception;
-
- /**
- * Signal that all state in {@link #readStateAddresses} for {@code window} has been merged into
- * the {@link #writeStateAddress} for {@code window}.
- */
- void merged(W window);
-
- /**
- * Return the state address windows for ACTIVE {@code window} from which all state associated
- * should be read and merged.
- */
- Set<W> readStateAddresses(W window);
-
- /**
- * Return the state address window of ACTIVE {@code window} into which all new state should be
- * written. Always one of the results of {@link #readStateAddresses}.
- */
- W writeStateAddress(W window);
-
- /**
- * Return the state address window into which all new state should be written after
- * ACTIVE windows {@code toBeMerged} have been merged into {@code mergeResult}.
- */
- W mergedWriteStateAddress(Collection<W> toBeMerged, W mergeResult);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ApiSurface.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ApiSurface.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ApiSurface.java
deleted file mode 100644
index 7a9c877..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ApiSurface.java
+++ /dev/null
@@ -1,642 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-import com.google.common.collect.Sets;
-import com.google.common.reflect.ClassPath;
-import com.google.common.reflect.ClassPath.ClassInfo;
-import com.google.common.reflect.Invokable;
-import com.google.common.reflect.Parameter;
-import com.google.common.reflect.TypeToken;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Field;
-import java.lang.reflect.GenericArrayType;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.lang.reflect.TypeVariable;
-import java.lang.reflect.WildcardType;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-/**
- * Represents the API surface of a package prefix. Used for accessing public classes,
- * methods, and the types they reference, to control what dependencies are re-exported.
- *
- * <p>For the purposes of calculating the public API surface, exposure includes any public
- * or protected occurrence of:
- *
- * <ul>
- * <li>superclasses
- * <li>interfaces implemented
- * <li>actual type arguments to generic types
- * <li>array component types
- * <li>method return types
- * <li>method parameter types
- * <li>type variable bounds
- * <li>wildcard bounds
- * </ul>
- *
- * <p>Exposure is a transitive property. The resulting map excludes primitives
- * and array classes themselves.
- *
- * <p>It is prudent (though not required) to prune prefixes like "java" via the builder
- * method {@link #pruningPrefix} to halt the traversal so it does not uselessly catalog references
- * that are not interesting.
- */
-@SuppressWarnings("rawtypes")
-public class ApiSurface {
- private static Logger logger = LoggerFactory.getLogger(ApiSurface.class);
-
- /**
- * Returns an empty {@link ApiSurface}.
- */
- public static ApiSurface empty() {
- logger.debug("Returning an empty ApiSurface");
- return new ApiSurface(Collections.<Class<?>>emptySet(), Collections.<Pattern>emptySet());
- }
-
- /**
- * Returns an {@link ApiSurface} object representing the given package and all subpackages.
- */
- public static ApiSurface ofPackage(String packageName) throws IOException {
- return ApiSurface.empty().includingPackage(packageName);
- }
-
- /**
- * Returns an {@link ApiSurface} object representing just the surface of the given class.
- */
- public static ApiSurface ofClass(Class<?> clazz) {
- return ApiSurface.empty().includingClass(clazz);
- }
-
- /**
- * Returns an {@link ApiSurface} like this one, but also including the named
- * package and all of its subpackages.
- */
- public ApiSurface includingPackage(String packageName) throws IOException {
- ClassPath classPath = ClassPath.from(ClassLoader.getSystemClassLoader());
-
- Set<Class<?>> newRootClasses = Sets.newHashSet();
- for (ClassInfo classInfo : classPath.getTopLevelClassesRecursive(packageName)) {
- Class clazz = classInfo.load();
- if (exposed(clazz.getModifiers())) {
- newRootClasses.add(clazz);
- }
- }
- logger.debug("Including package {} and subpackages: {}", packageName, newRootClasses);
- newRootClasses.addAll(rootClasses);
-
- return new ApiSurface(newRootClasses, patternsToPrune);
- }
-
- /**
- * Returns an {@link ApiSurface} like this one, but also including the given class.
- */
- public ApiSurface includingClass(Class<?> clazz) {
- Set<Class<?>> newRootClasses = Sets.newHashSet();
- logger.debug("Including class {}", clazz);
- newRootClasses.add(clazz);
- newRootClasses.addAll(rootClasses);
- return new ApiSurface(newRootClasses, patternsToPrune);
- }
-
- /**
- * Returns an {@link ApiSurface} like this one, but pruning transitive
- * references from classes whose full name (including package) begins with the provided prefix.
- */
- public ApiSurface pruningPrefix(String prefix) {
- return pruningPattern(Pattern.compile(Pattern.quote(prefix) + ".*"));
- }
-
- /**
- * Returns an {@link ApiSurface} like this one, but pruning references from the named
- * class.
- */
- public ApiSurface pruningClassName(String className) {
- return pruningPattern(Pattern.compile(Pattern.quote(className)));
- }
-
- /**
- * Returns an {@link ApiSurface} like this one, but pruning references from the
- * provided class.
- */
- public ApiSurface pruningClass(Class<?> clazz) {
- return pruningClassName(clazz.getName());
- }
-
- /**
- * Returns an {@link ApiSurface} like this one, but pruning transitive
- * references from classes whose full name (including package) begins with the provided prefix.
- */
- public ApiSurface pruningPattern(Pattern pattern) {
- Set<Pattern> newPatterns = Sets.newHashSet();
- newPatterns.addAll(patternsToPrune);
- newPatterns.add(pattern);
- return new ApiSurface(rootClasses, newPatterns);
- }
-
- /**
- * See {@link #pruningPattern(Pattern)}.
- */
- public ApiSurface pruningPattern(String patternString) {
- return pruningPattern(Pattern.compile(patternString));
- }
-
- /**
- * Returns all public classes originally belonging to the package
- * in the {@link ApiSurface}.
- */
- public Set<Class<?>> getRootClasses() {
- return rootClasses;
- }
-
- /**
- * Returns exposed types in this set, including arrays and primitives as
- * specified.
- */
- public Set<Class<?>> getExposedClasses() {
- return getExposedToExposers().keySet();
- }
-
- /**
- * Returns a path from an exposed class to a root class. There may be many, but this
- * gives only one.
- *
- * <p>If there are only cycles, with no path back to a root class, throws
- * IllegalStateException.
- */
- public List<Class<?>> getAnyExposurePath(Class<?> exposedClass) {
- Set<Class<?>> excluded = Sets.newHashSet();
- excluded.add(exposedClass);
- List<Class<?>> path = getAnyExposurePath(exposedClass, excluded);
- if (path == null) {
- throw new IllegalArgumentException(
- "Class " + exposedClass + " has no path back to any root class."
- + " It should never have been considered exposed.");
- } else {
- return path;
- }
- }
-
- /**
- * Returns a path from an exposed class to a root class. There may be many, but this
- * gives only one. It will not return a path that crosses the excluded classes.
- *
- * <p>If there are only cycles or paths through the excluded classes, returns null.
- *
- * <p>If the class is not actually in the exposure map, throws IllegalArgumentException
- */
- private List<Class<?>> getAnyExposurePath(Class<?> exposedClass, Set<Class<?>> excluded) {
- List<Class<?>> exposurePath = Lists.newArrayList();
- exposurePath.add(exposedClass);
-
- Collection<Class<?>> exposers = getExposedToExposers().get(exposedClass);
- if (exposers.isEmpty()) {
- throw new IllegalArgumentException("Class " + exposedClass + " is not exposed.");
- }
-
- for (Class<?> exposer : exposers) {
- if (excluded.contains(exposer)) {
- continue;
- }
-
- // A null exposer means this is already a root class.
- if (exposer == null) {
- return exposurePath;
- }
-
- List<Class<?>> restOfPath = getAnyExposurePath(
- exposer,
- Sets.union(excluded, Sets.newHashSet(exposer)));
-
- if (restOfPath != null) {
- exposurePath.addAll(restOfPath);
- return exposurePath;
- }
- }
- return null;
- }
-
- ////////////////////////////////////////////////////////////////////
-
- // Fields initialized upon construction
- private final Set<Class<?>> rootClasses;
- private final Set<Pattern> patternsToPrune;
-
- // Fields computed on-demand
- private Multimap<Class<?>, Class<?>> exposedToExposers = null;
- private Pattern prunedPattern = null;
- private Set<Type> visited = null;
-
- private ApiSurface(Set<Class<?>> rootClasses, Set<Pattern> patternsToPrune) {
- this.rootClasses = rootClasses;
- this.patternsToPrune = patternsToPrune;
- }
-
- /**
- * A map from exposed types to place where they are exposed, in the sense of being a part
- * of a public-facing API surface.
- *
- * <p>This map is the adjencency list representation of a directed graph, where an edge from type
- * {@code T1} to type {@code T2} indicates that {@code T2} directly exposes {@code T1} in its API
- * surface.
- *
- * <p>The traversal methods in this class are designed to avoid repeatedly processing types, since
- * there will almost always be cyclic references.
- */
- private Multimap<Class<?>, Class<?>> getExposedToExposers() {
- if (exposedToExposers == null) {
- constructExposedToExposers();
- }
- return exposedToExposers;
- }
-
- /**
- * See {@link #getExposedToExposers}.
- */
- private void constructExposedToExposers() {
- visited = Sets.newHashSet();
- exposedToExposers = Multimaps.newSetMultimap(
- Maps.<Class<?>, Collection<Class<?>>>newHashMap(),
- new Supplier<Set<Class<?>>>() {
- @Override
- public Set<Class<?>> get() {
- return Sets.newHashSet();
- }
- });
-
- for (Class<?> clazz : rootClasses) {
- addExposedTypes(clazz, null);
- }
- }
-
- /**
- * A combined {@code Pattern} that implements all the pruning specified.
- */
- private Pattern getPrunedPattern() {
- if (prunedPattern == null) {
- constructPrunedPattern();
- }
- return prunedPattern;
- }
-
- /**
- * See {@link #getPrunedPattern}.
- */
- private void constructPrunedPattern() {
- Set<String> prunedPatternStrings = Sets.newHashSet();
- for (Pattern patternToPrune : patternsToPrune) {
- prunedPatternStrings.add(patternToPrune.pattern());
- }
- prunedPattern = Pattern.compile("(" + Joiner.on(")|(").join(prunedPatternStrings) + ")");
- }
-
- /**
- * Whether a type and all that it references should be pruned from the graph.
- */
- private boolean pruned(Type type) {
- return pruned(TypeToken.of(type).getRawType());
- }
-
- /**
- * Whether a class and all that it references should be pruned from the graph.
- */
- private boolean pruned(Class<?> clazz) {
- return clazz.isPrimitive()
- || clazz.isArray()
- || getPrunedPattern().matcher(clazz.getName()).matches();
- }
-
- /**
- * Whether a type has already beens sufficiently processed.
- */
- private boolean done(Type type) {
- return visited.contains(type);
- }
-
- private void recordExposure(Class<?> exposed, Class<?> cause) {
- exposedToExposers.put(exposed, cause);
- }
-
- private void recordExposure(Type exposed, Class<?> cause) {
- exposedToExposers.put(TypeToken.of(exposed).getRawType(), cause);
- }
-
- private void visit(Type type) {
- visited.add(type);
- }
-
- /**
- * See {@link #addExposedTypes(Type, Class)}.
- */
- private void addExposedTypes(TypeToken type, Class<?> cause) {
- logger.debug(
- "Adding exposed types from {}, which is the type in type token {}", type.getType(), type);
- addExposedTypes(type.getType(), cause);
- }
-
- /**
- * Adds any references learned by following a link from {@code cause} to {@code type}.
- * This will dispatch according to the concrete {@code Type} implementation. See the
- * other overloads of {@code addExposedTypes} for their details.
- */
- private void addExposedTypes(Type type, Class<?> cause) {
- if (type instanceof TypeVariable) {
- logger.debug("Adding exposed types from {}, which is a type variable", type);
- addExposedTypes((TypeVariable) type, cause);
- } else if (type instanceof WildcardType) {
- logger.debug("Adding exposed types from {}, which is a wildcard type", type);
- addExposedTypes((WildcardType) type, cause);
- } else if (type instanceof GenericArrayType) {
- logger.debug("Adding exposed types from {}, which is a generic array type", type);
- addExposedTypes((GenericArrayType) type, cause);
- } else if (type instanceof ParameterizedType) {
- logger.debug("Adding exposed types from {}, which is a parameterized type", type);
- addExposedTypes((ParameterizedType) type, cause);
- } else if (type instanceof Class) {
- logger.debug("Adding exposed types from {}, which is a class", type);
- addExposedTypes((Class) type, cause);
- } else {
- throw new IllegalArgumentException("Unknown implementation of Type");
- }
- }
-
- /**
- * Adds any types exposed to this set. These will
- * come from the (possibly absent) bounds on the
- * type variable.
- */
- private void addExposedTypes(TypeVariable type, Class<?> cause) {
- if (done(type)) {
- return;
- }
- visit(type);
- for (Type bound : type.getBounds()) {
- logger.debug("Adding exposed types from {}, which is a type bound on {}", bound, type);
- addExposedTypes(bound, cause);
- }
- }
-
- /**
- * Adds any types exposed to this set. These will come from the (possibly absent) bounds on the
- * wildcard.
- */
- private void addExposedTypes(WildcardType type, Class<?> cause) {
- visit(type);
- for (Type lowerBound : type.getLowerBounds()) {
- logger.debug(
- "Adding exposed types from {}, which is a type lower bound on wildcard type {}",
- lowerBound,
- type);
- addExposedTypes(lowerBound, cause);
- }
- for (Type upperBound : type.getUpperBounds()) {
- logger.debug(
- "Adding exposed types from {}, which is a type upper bound on wildcard type {}",
- upperBound,
- type);
- addExposedTypes(upperBound, cause);
- }
- }
-
- /**
- * Adds any types exposed from the given array type. The array type itself is not added. The
- * cause of the exposure of the underlying type is considered whatever type exposed the array
- * type.
- */
- private void addExposedTypes(GenericArrayType type, Class<?> cause) {
- if (done(type)) {
- return;
- }
- visit(type);
- logger.debug(
- "Adding exposed types from {}, which is the component type on generic array type {}",
- type.getGenericComponentType(),
- type);
- addExposedTypes(type.getGenericComponentType(), cause);
- }
-
- /**
- * Adds any types exposed to this set. Even if the
- * root type is to be pruned, the actual type arguments
- * are processed.
- */
- private void addExposedTypes(ParameterizedType type, Class<?> cause) {
- // Even if the type is already done, this link to it may be new
- boolean alreadyDone = done(type);
- if (!pruned(type)) {
- visit(type);
- recordExposure(type, cause);
- }
- if (alreadyDone) {
- return;
- }
-
- // For a parameterized type, pruning does not take place
- // here, only for the raw class.
- // The type parameters themselves may not be pruned,
- // for example with List<MyApiType> probably the
- // standard List is pruned, but MyApiType is not.
- logger.debug(
- "Adding exposed types from {}, which is the raw type on parameterized type {}",
- type.getRawType(),
- type);
- addExposedTypes(type.getRawType(), cause);
- for (Type typeArg : type.getActualTypeArguments()) {
- logger.debug(
- "Adding exposed types from {}, which is a type argument on parameterized type {}",
- typeArg,
- type);
- addExposedTypes(typeArg, cause);
- }
- }
-
- /**
- * Adds a class and all of the types it exposes. The cause
- * of the class being exposed is given, and the cause
- * of everything within the class is that class itself.
- */
- private void addExposedTypes(Class<?> clazz, Class<?> cause) {
- if (pruned(clazz)) {
- return;
- }
- // Even if `clazz` has been visited, the link from `cause` may be new
- boolean alreadyDone = done(clazz);
- visit(clazz);
- recordExposure(clazz, cause);
- if (alreadyDone || pruned(clazz)) {
- return;
- }
-
- TypeToken<?> token = TypeToken.of(clazz);
- for (TypeToken<?> superType : token.getTypes()) {
- if (!superType.equals(token)) {
- logger.debug(
- "Adding exposed types from {}, which is a super type token on {}", superType, clazz);
- addExposedTypes(superType, clazz);
- }
- }
- for (Class innerClass : clazz.getDeclaredClasses()) {
- if (exposed(innerClass.getModifiers())) {
- logger.debug(
- "Adding exposed types from {}, which is an exposed inner class of {}",
- innerClass,
- clazz);
- addExposedTypes(innerClass, clazz);
- }
- }
- for (Field field : clazz.getDeclaredFields()) {
- if (exposed(field.getModifiers())) {
- logger.debug("Adding exposed types from {}, which is an exposed field on {}", field, clazz);
- addExposedTypes(field, clazz);
- }
- }
- for (Invokable invokable : getExposedInvokables(token)) {
- logger.debug(
- "Adding exposed types from {}, which is an exposed invokable on {}", invokable, clazz);
- addExposedTypes(invokable, clazz);
- }
- }
-
- private void addExposedTypes(Invokable<?, ?> invokable, Class<?> cause) {
- addExposedTypes(invokable.getReturnType(), cause);
- for (Annotation annotation : invokable.getAnnotations()) {
- logger.debug(
- "Adding exposed types from {}, which is an annotation on invokable {}",
- annotation,
- invokable);
- addExposedTypes(annotation.annotationType(), cause);
- }
- for (Parameter parameter : invokable.getParameters()) {
- logger.debug(
- "Adding exposed types from {}, which is a parameter on invokable {}",
- parameter,
- invokable);
- addExposedTypes(parameter, cause);
- }
- for (TypeToken<?> exceptionType : invokable.getExceptionTypes()) {
- logger.debug(
- "Adding exposed types from {}, which is an exception type on invokable {}",
- exceptionType,
- invokable);
- addExposedTypes(exceptionType, cause);
- }
- }
-
- private void addExposedTypes(Parameter parameter, Class<?> cause) {
- logger.debug(
- "Adding exposed types from {}, which is the type of parameter {}",
- parameter.getType(),
- parameter);
- addExposedTypes(parameter.getType(), cause);
- for (Annotation annotation : parameter.getAnnotations()) {
- logger.debug(
- "Adding exposed types from {}, which is an annotation on parameter {}",
- annotation,
- parameter);
- addExposedTypes(annotation.annotationType(), cause);
- }
- }
-
- private void addExposedTypes(Field field, Class<?> cause) {
- addExposedTypes(field.getGenericType(), cause);
- for (Annotation annotation : field.getDeclaredAnnotations()) {
- logger.debug(
- "Adding exposed types from {}, which is an annotation on field {}", annotation, field);
- addExposedTypes(annotation.annotationType(), cause);
- }
- }
-
- /**
- * Returns an {@link Invokable} for each public methods or constructors of a type.
- */
- private Set<Invokable> getExposedInvokables(TypeToken<?> type) {
- Set<Invokable> invokables = Sets.newHashSet();
-
- for (Constructor constructor : type.getRawType().getConstructors()) {
- if (0 != (constructor.getModifiers() & (Modifier.PUBLIC | Modifier.PROTECTED))) {
- invokables.add(type.constructor(constructor));
- }
- }
-
- for (Method method : type.getRawType().getMethods()) {
- if (0 != (method.getModifiers() & (Modifier.PUBLIC | Modifier.PROTECTED))) {
- invokables.add(type.method(method));
- }
- }
-
- return invokables;
- }
-
- /**
- * Returns true of the given modifier bitmap indicates exposure (public or protected access).
- */
- private boolean exposed(int modifiers) {
- return 0 != (modifiers & (Modifier.PUBLIC | Modifier.PROTECTED));
- }
-
-
- ////////////////////////////////////////////////////////////////////////////
-
- public static ApiSurface getSdkApiSurface() throws IOException {
- return ApiSurface.ofPackage("com.google.cloud.dataflow")
- .pruningPattern("com[.]google[.]cloud[.]dataflow.*Test")
- .pruningPattern("com[.]google[.]cloud[.]dataflow.*Benchmark")
- .pruningPrefix("com.google.cloud.dataflow.integration")
- .pruningPrefix("java")
- .pruningPrefix("com.google.api")
- .pruningPrefix("com.google.auth")
- .pruningPrefix("com.google.bigtable.v1")
- .pruningPrefix("com.google.cloud.bigtable.config")
- .pruningPrefix("com.google.cloud.bigtable.grpc.Bigtable*Name")
- .pruningPrefix("com.google.protobuf")
- .pruningPrefix("org.joda.time")
- .pruningPrefix("org.apache.avro")
- .pruningPrefix("org.junit")
- .pruningPrefix("com.fasterxml.jackson.annotation");
- }
-
- public static void main(String[] args) throws Exception {
- List<String> names = Lists.newArrayList();
- for (Class clazz : getSdkApiSurface().getExposedClasses()) {
- names.add(clazz.getName());
- }
- List<String> sortedNames = Lists.newArrayList(names);
- Collections.sort(sortedNames);
-
- for (String name : sortedNames) {
- System.out.println(name);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AppEngineEnvironment.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AppEngineEnvironment.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AppEngineEnvironment.java
deleted file mode 100644
index c7fe4b4..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AppEngineEnvironment.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import java.lang.reflect.InvocationTargetException;
-
-/** Stores whether we are running within AppEngine or not. */
-public class AppEngineEnvironment {
- /**
- * True if running inside of AppEngine, false otherwise.
- */
- @Deprecated
- public static final boolean IS_APP_ENGINE = isAppEngine();
-
- /**
- * Attempts to detect whether we are inside of AppEngine.
- *
- * <p>Purposely copied and left private from private <a href="https://code.google.com/p/
- * guava-libraries/source/browse/guava/src/com/google/common/util/concurrent/
- * MoreExecutors.java#785">code.google.common.util.concurrent.MoreExecutors#isAppEngine</a>.
- *
- * @return true if we are inside of AppEngine, false otherwise.
- */
- static boolean isAppEngine() {
- if (System.getProperty("com.google.appengine.runtime.environment") == null) {
- return false;
- }
- try {
- // If the current environment is null, we're not inside AppEngine.
- return Class.forName("com.google.apphosting.api.ApiProxy")
- .getMethod("getCurrentEnvironment")
- .invoke(null) != null;
- } catch (ClassNotFoundException e) {
- // If ApiProxy doesn't exist, we're not on AppEngine at all.
- return false;
- } catch (InvocationTargetException e) {
- // If ApiProxy throws an exception, we're not in a proper AppEngine environment.
- return false;
- } catch (IllegalAccessException e) {
- // If the method isn't accessible, we're not on a supported version of AppEngine;
- return false;
- } catch (NoSuchMethodException e) {
- // If the method doesn't exist, we're not on a supported version of AppEngine;
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AppliedCombineFn.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AppliedCombineFn.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AppliedCombineFn.java
deleted file mode 100644
index 512d72d..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AppliedCombineFn.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderRegistry;
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.transforms.CombineFnBase.PerKeyCombineFn;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.common.annotations.VisibleForTesting;
-
-import java.io.Serializable;
-
-/**
- * A {@link KeyedCombineFnWithContext} with a fixed accumulator coder. This is created from a
- * specific application of the {@link KeyedCombineFnWithContext}.
- *
- * <p>Because the {@code AccumT} may reference {@code InputT}, the specific {@code Coder<AccumT>}
- * may depend on the {@code Coder<InputT>}.
- *
- * @param <K> type of keys
- * @param <InputT> type of input values
- * @param <AccumT> type of mutable accumulator values
- * @param <OutputT> type of output values
- */
-public class AppliedCombineFn<K, InputT, AccumT, OutputT> implements Serializable {
-
- private final PerKeyCombineFn<K, InputT, AccumT, OutputT> fn;
- private final Coder<AccumT> accumulatorCoder;
-
- private final Iterable<PCollectionView<?>> sideInputViews;
- private final KvCoder<K, InputT> kvCoder;
- private final WindowingStrategy<?, ?> windowingStrategy;
-
- private AppliedCombineFn(PerKeyCombineFn<K, InputT, AccumT, OutputT> fn,
- Coder<AccumT> accumulatorCoder, Iterable<PCollectionView<?>> sideInputViews,
- KvCoder<K, InputT> kvCoder, WindowingStrategy<?, ?> windowingStrategy) {
- this.fn = fn;
- this.accumulatorCoder = accumulatorCoder;
- this.sideInputViews = sideInputViews;
- this.kvCoder = kvCoder;
- this.windowingStrategy = windowingStrategy;
- }
-
- public static <K, InputT, AccumT, OutputT> AppliedCombineFn<K, InputT, AccumT, OutputT>
- withAccumulatorCoder(
- PerKeyCombineFn<? super K, ? super InputT, AccumT, OutputT> fn,
- Coder<AccumT> accumCoder) {
- return withAccumulatorCoder(fn, accumCoder, null, null, null);
- }
-
- public static <K, InputT, AccumT, OutputT> AppliedCombineFn<K, InputT, AccumT, OutputT>
- withAccumulatorCoder(
- PerKeyCombineFn<? super K, ? super InputT, AccumT, OutputT> fn,
- Coder<AccumT> accumCoder, Iterable<PCollectionView<?>> sideInputViews,
- KvCoder<K, InputT> kvCoder, WindowingStrategy<?, ?> windowingStrategy) {
- // Casting down the K and InputT is safe because they're only used as inputs.
- @SuppressWarnings("unchecked")
- PerKeyCombineFn<K, InputT, AccumT, OutputT> clonedFn =
- (PerKeyCombineFn<K, InputT, AccumT, OutputT>) SerializableUtils.clone(fn);
- return create(clonedFn, accumCoder, sideInputViews, kvCoder, windowingStrategy);
- }
-
- @VisibleForTesting
- public static <K, InputT, AccumT, OutputT> AppliedCombineFn<K, InputT, AccumT, OutputT>
- withInputCoder(PerKeyCombineFn<? super K, ? super InputT, AccumT, OutputT> fn,
- CoderRegistry registry, KvCoder<K, InputT> kvCoder) {
- return withInputCoder(fn, registry, kvCoder, null, null);
- }
-
- public static <K, InputT, AccumT, OutputT> AppliedCombineFn<K, InputT, AccumT, OutputT>
- withInputCoder(PerKeyCombineFn<? super K, ? super InputT, AccumT, OutputT> fn,
- CoderRegistry registry, KvCoder<K, InputT> kvCoder,
- Iterable<PCollectionView<?>> sideInputViews, WindowingStrategy<?, ?> windowingStrategy) {
- // Casting down the K and InputT is safe because they're only used as inputs.
- @SuppressWarnings("unchecked")
- PerKeyCombineFn<K, InputT, AccumT, OutputT> clonedFn =
- (PerKeyCombineFn<K, InputT, AccumT, OutputT>) SerializableUtils.clone(fn);
- try {
- Coder<AccumT> accumulatorCoder = clonedFn.getAccumulatorCoder(
- registry, kvCoder.getKeyCoder(), kvCoder.getValueCoder());
- return create(clonedFn, accumulatorCoder, sideInputViews, kvCoder, windowingStrategy);
- } catch (CannotProvideCoderException e) {
- throw new IllegalStateException("Could not determine coder for accumulator", e);
- }
- }
-
- private static <K, InputT, AccumT, OutputT> AppliedCombineFn<K, InputT, AccumT, OutputT> create(
- PerKeyCombineFn<K, InputT, AccumT, OutputT> fn,
- Coder<AccumT> accumulatorCoder, Iterable<PCollectionView<?>> sideInputViews,
- KvCoder<K, InputT> kvCoder, WindowingStrategy<?, ?> windowingStrategy) {
- return new AppliedCombineFn<>(
- fn, accumulatorCoder, sideInputViews, kvCoder, windowingStrategy);
- }
-
- public PerKeyCombineFn<K, InputT, AccumT, OutputT> getFn() {
- return fn;
- }
-
- public Iterable<PCollectionView<?>> getSideInputViews() {
- return sideInputViews;
- }
-
- public Coder<AccumT> getAccumulatorCoder() {
- return accumulatorCoder;
- }
-
- public KvCoder<K, InputT> getKvCoder() {
- return kvCoder;
- }
-
- public WindowingStrategy<?, ?> getWindowingStrategy() {
- return windowingStrategy;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AssignWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AssignWindowsDoFn.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AssignWindowsDoFn.java
deleted file mode 100644
index ca59c53..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AssignWindowsDoFn.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
-import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
-
-import org.joda.time.Instant;
-
-import java.util.Collection;
-
-/**
- * {@link DoFn} that tags elements of a PCollection with windows, according
- * to the provided {@link WindowFn}.
- * @param <T> Type of elements being windowed
- * @param <W> Window type
- */
-@SystemDoFnInternal
-public class AssignWindowsDoFn<T, W extends BoundedWindow> extends DoFn<T, T> {
- private WindowFn<? super T, W> fn;
-
- public AssignWindowsDoFn(WindowFn<? super T, W> fn) {
- this.fn = fn;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void processElement(final ProcessContext c) throws Exception {
- Collection<W> windows =
- ((WindowFn<T, W>) fn).assignWindows(
- ((WindowFn<T, W>) fn).new AssignContext() {
- @Override
- public T element() {
- return c.element();
- }
-
- @Override
- public Instant timestamp() {
- return c.timestamp();
- }
-
- @Override
- public Collection<? extends BoundedWindow> windows() {
- return c.windowingInternals().windows();
- }
- });
-
- c.windowingInternals()
- .outputWindowedValue(c.element(), c.timestamp(), windows, PaneInfo.NO_FIRING);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AttemptAndTimeBoundedExponentialBackOff.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AttemptAndTimeBoundedExponentialBackOff.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AttemptAndTimeBoundedExponentialBackOff.java
deleted file mode 100644
index e94d414..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AttemptAndTimeBoundedExponentialBackOff.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.NanoClock;
-import com.google.common.base.Preconditions;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * Extension of {@link AttemptBoundedExponentialBackOff} that bounds the total time that the backoff
- * is happening as well as the amount of retries. Acts exactly as a AttemptBoundedExponentialBackOff
- * unless the time interval has expired since the object was created. At this point, it will always
- * return BackOff.STOP. Calling reset() resets both the timer and the number of retry attempts,
- * unless a custom ResetPolicy (ResetPolicy.ATTEMPTS or ResetPolicy.TIMER) is passed to the
- * constructor.
- *
- * <p>Implementation is not thread-safe.
- */
-public class AttemptAndTimeBoundedExponentialBackOff extends AttemptBoundedExponentialBackOff {
- private long endTimeMillis;
- private long maximumTotalWaitTimeMillis;
- private ResetPolicy resetPolicy;
- private final NanoClock nanoClock;
- // NanoClock.SYSTEM has a max elapsed time of 292 years or 2^63 ns. Here, we choose 2^53 ns as
- // a smaller but still huge limit.
- private static final long MAX_ELAPSED_TIME_MILLIS = 1L << 53;
-
- /**
- * A ResetPolicy controls the behavior of this BackOff when reset() is called. By default, both
- * the number of attempts and the time bound for the BackOff are reset, but an alternative
- * ResetPolicy may be set to only reset one of these two.
- */
- public static enum ResetPolicy {
- ALL,
- ATTEMPTS,
- TIMER
- }
-
- /**
- * Constructs an instance of AttemptAndTimeBoundedExponentialBackoff.
- *
- * @param maximumNumberOfAttempts The maximum number of attempts it will make.
- * @param initialIntervalMillis The original interval to wait between attempts in milliseconds.
- * @param maximumTotalWaitTimeMillis The maximum total time that this object will
- * allow more attempts in milliseconds.
- */
- public AttemptAndTimeBoundedExponentialBackOff(
- int maximumNumberOfAttempts, long initialIntervalMillis, long maximumTotalWaitTimeMillis) {
- this(
- maximumNumberOfAttempts,
- initialIntervalMillis,
- maximumTotalWaitTimeMillis,
- ResetPolicy.ALL,
- NanoClock.SYSTEM);
- }
-
- /**
- * Constructs an instance of AttemptAndTimeBoundedExponentialBackoff.
- *
- * @param maximumNumberOfAttempts The maximum number of attempts it will make.
- * @param initialIntervalMillis The original interval to wait between attempts in milliseconds.
- * @param maximumTotalWaitTimeMillis The maximum total time that this object will
- * allow more attempts in milliseconds.
- * @param resetPolicy The ResetPolicy specifying the properties of this BackOff that are subject
- * to being reset.
- */
- public AttemptAndTimeBoundedExponentialBackOff(
- int maximumNumberOfAttempts,
- long initialIntervalMillis,
- long maximumTotalWaitTimeMillis,
- ResetPolicy resetPolicy) {
- this(
- maximumNumberOfAttempts,
- initialIntervalMillis,
- maximumTotalWaitTimeMillis,
- resetPolicy,
- NanoClock.SYSTEM);
- }
-
- /**
- * Constructs an instance of AttemptAndTimeBoundedExponentialBackoff.
- *
- * @param maximumNumberOfAttempts The maximum number of attempts it will make.
- * @param initialIntervalMillis The original interval to wait between attempts in milliseconds.
- * @param maximumTotalWaitTimeMillis The maximum total time that this object will
- * allow more attempts in milliseconds.
- * @param resetPolicy The ResetPolicy specifying the properties of this BackOff that are subject
- * to being reset.
- * @param nanoClock clock used to measure the time that has passed.
- */
- public AttemptAndTimeBoundedExponentialBackOff(
- int maximumNumberOfAttempts,
- long initialIntervalMillis,
- long maximumTotalWaitTimeMillis,
- ResetPolicy resetPolicy,
- NanoClock nanoClock) {
- super(maximumNumberOfAttempts, initialIntervalMillis);
- Preconditions.checkArgument(
- maximumTotalWaitTimeMillis > 0, "Maximum total wait time must be greater than zero.");
- Preconditions.checkArgument(
- maximumTotalWaitTimeMillis < MAX_ELAPSED_TIME_MILLIS,
- "Maximum total wait time must be less than " + MAX_ELAPSED_TIME_MILLIS + " milliseconds");
- Preconditions.checkArgument(resetPolicy != null, "resetPolicy may not be null");
- Preconditions.checkArgument(nanoClock != null, "nanoClock may not be null");
- this.maximumTotalWaitTimeMillis = maximumTotalWaitTimeMillis;
- this.resetPolicy = resetPolicy;
- this.nanoClock = nanoClock;
- // Set the end time for this BackOff. Note that we cannot simply call reset() here since the
- // resetPolicy may not be set to reset the time bound.
- endTimeMillis = getTimeMillis() + maximumTotalWaitTimeMillis;
- }
-
- @Override
- public void reset() {
- // reset() is called in the constructor of the parent class before resetPolicy and nanoClock are
- // set. In this case, we call the parent class's reset() method and return.
- if (resetPolicy == null) {
- super.reset();
- return;
- }
- // Reset the number of attempts.
- if (resetPolicy == ResetPolicy.ALL || resetPolicy == ResetPolicy.ATTEMPTS) {
- super.reset();
- }
- // Reset the time bound.
- if (resetPolicy == ResetPolicy.ALL || resetPolicy == ResetPolicy.TIMER) {
- endTimeMillis = getTimeMillis() + maximumTotalWaitTimeMillis;
- }
- }
-
- public void setEndtimeMillis(long endTimeMillis) {
- this.endTimeMillis = endTimeMillis;
- }
-
- @Override
- public long nextBackOffMillis() {
- if (atMaxAttempts()) {
- return BackOff.STOP;
- }
- long backoff = Math.min(super.nextBackOffMillis(), endTimeMillis - getTimeMillis());
- return (backoff > 0 ? backoff : BackOff.STOP);
- }
-
- private long getTimeMillis() {
- return TimeUnit.NANOSECONDS.toMillis(nanoClock.nanoTime());
- }
-
- @Override
- public boolean atMaxAttempts() {
- return super.atMaxAttempts() || getTimeMillis() >= endTimeMillis;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AttemptBoundedExponentialBackOff.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AttemptBoundedExponentialBackOff.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AttemptBoundedExponentialBackOff.java
deleted file mode 100644
index 613316e..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AttemptBoundedExponentialBackOff.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.api.client.util.BackOff;
-import com.google.common.base.Preconditions;
-
-/**
- * Implementation of {@link BackOff} that increases the back off period for each retry attempt
- * using a randomization function that grows exponentially.
- *
- * <p>Example: The initial interval is .5 seconds and the maximum number of retries is 10.
- * For 10 tries the sequence will be (values in seconds):
- *
- * <pre>
- * retry# retry_interval randomized_interval
- * 1 0.5 [0.25, 0.75]
- * 2 0.75 [0.375, 1.125]
- * 3 1.125 [0.562, 1.687]
- * 4 1.687 [0.8435, 2.53]
- * 5 2.53 [1.265, 3.795]
- * 6 3.795 [1.897, 5.692]
- * 7 5.692 [2.846, 8.538]
- * 8 8.538 [4.269, 12.807]
- * 9 12.807 [6.403, 19.210]
- * 10 {@link BackOff#STOP}
- * </pre>
- *
- * <p>Implementation is not thread-safe.
- */
-public class AttemptBoundedExponentialBackOff implements BackOff {
- public static final double DEFAULT_MULTIPLIER = 1.5;
- public static final double DEFAULT_RANDOMIZATION_FACTOR = 0.5;
- private final int maximumNumberOfAttempts;
- private final long initialIntervalMillis;
- private int currentAttempt;
-
- public AttemptBoundedExponentialBackOff(int maximumNumberOfAttempts, long initialIntervalMillis) {
- Preconditions.checkArgument(maximumNumberOfAttempts > 0,
- "Maximum number of attempts must be greater than zero.");
- Preconditions.checkArgument(initialIntervalMillis > 0,
- "Initial interval must be greater than zero.");
- this.maximumNumberOfAttempts = maximumNumberOfAttempts;
- this.initialIntervalMillis = initialIntervalMillis;
- reset();
- }
-
- @Override
- public void reset() {
- currentAttempt = 1;
- }
-
- @Override
- public long nextBackOffMillis() {
- if (currentAttempt >= maximumNumberOfAttempts) {
- return BackOff.STOP;
- }
- double currentIntervalMillis = initialIntervalMillis
- * Math.pow(DEFAULT_MULTIPLIER, currentAttempt - 1);
- double randomOffset = (Math.random() * 2 - 1)
- * DEFAULT_RANDOMIZATION_FACTOR * currentIntervalMillis;
- currentAttempt += 1;
- return Math.round(currentIntervalMillis + randomOffset);
- }
-
- public boolean atMaxAttempts() {
- return currentAttempt >= maximumNumberOfAttempts;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AvroUtils.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AvroUtils.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AvroUtils.java
deleted file mode 100644
index c3a4861..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AvroUtils.java
+++ /dev/null
@@ -1,345 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import static com.google.common.base.MoreObjects.firstNonNull;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Verify.verify;
-
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.file.DataFileConstants;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.DecoderFactory;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.util.Arrays;
-import java.util.List;
-
-import javax.annotation.Nullable;
-
-/**
- * A set of utilities for working with Avro files.
- *
- * <p>These utilities are based on the <a
- * href="https://avro.apache.org/docs/1.7.7/spec.html">Avro 1.7.7</a> specification.
- */
-public class AvroUtils {
-
- /**
- * Avro file metadata.
- */
- public static class AvroMetadata {
- private byte[] syncMarker;
- private String codec;
- private String schemaString;
-
- AvroMetadata(byte[] syncMarker, String codec, String schemaString) {
- this.syncMarker = syncMarker;
- this.codec = codec;
- this.schemaString = schemaString;
- }
-
- /**
- * The JSON-encoded <a href="https://avro.apache.org/docs/1.7.7/spec.html#schemas">schema</a>
- * string for the file.
- */
- public String getSchemaString() {
- return schemaString;
- }
-
- /**
- * The <a href="https://avro.apache.org/docs/1.7.7/spec.html#Required+Codecs">codec</a> of the
- * file.
- */
- public String getCodec() {
- return codec;
- }
-
- /**
- * The 16-byte sync marker for the file. See the documentation for
- * <a href="https://avro.apache.org/docs/1.7.7/spec.html#Object+Container+Files">Object
- * Container File</a> for more information.
- */
- public byte[] getSyncMarker() {
- return syncMarker;
- }
- }
-
- /**
- * Reads the {@link AvroMetadata} from the header of an Avro file.
- *
- * <p>This method parses the header of an Avro
- * <a href="https://avro.apache.org/docs/1.7.7/spec.html#Object+Container+Files">
- * Object Container File</a>.
- *
- * @throws IOException if the file is an invalid format.
- */
- public static AvroMetadata readMetadataFromFile(String fileName) throws IOException {
- String codec = null;
- String schemaString = null;
- byte[] syncMarker;
- try (InputStream stream =
- Channels.newInputStream(IOChannelUtils.getFactory(fileName).open(fileName))) {
- BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null);
-
- // The header of an object container file begins with a four-byte magic number, followed
- // by the file metadata (including the schema and codec), encoded as a map. Finally, the
- // header ends with the file's 16-byte sync marker.
- // See https://avro.apache.org/docs/1.7.7/spec.html#Object+Container+Files for details on
- // the encoding of container files.
-
- // Read the magic number.
- byte[] magic = new byte[DataFileConstants.MAGIC.length];
- decoder.readFixed(magic);
- if (!Arrays.equals(magic, DataFileConstants.MAGIC)) {
- throw new IOException("Missing Avro file signature: " + fileName);
- }
-
- // Read the metadata to find the codec and schema.
- ByteBuffer valueBuffer = ByteBuffer.allocate(512);
- long numRecords = decoder.readMapStart();
- while (numRecords > 0) {
- for (long recordIndex = 0; recordIndex < numRecords; recordIndex++) {
- String key = decoder.readString();
- // readBytes() clears the buffer and returns a buffer where:
- // - position is the start of the bytes read
- // - limit is the end of the bytes read
- valueBuffer = decoder.readBytes(valueBuffer);
- byte[] bytes = new byte[valueBuffer.remaining()];
- valueBuffer.get(bytes);
- if (key.equals(DataFileConstants.CODEC)) {
- codec = new String(bytes, "UTF-8");
- } else if (key.equals(DataFileConstants.SCHEMA)) {
- schemaString = new String(bytes, "UTF-8");
- }
- }
- numRecords = decoder.mapNext();
- }
- if (codec == null) {
- codec = DataFileConstants.NULL_CODEC;
- }
-
- // Finally, read the sync marker.
- syncMarker = new byte[DataFileConstants.SYNC_SIZE];
- decoder.readFixed(syncMarker);
- }
- return new AvroMetadata(syncMarker, codec, schemaString);
- }
-
- /**
- * Formats BigQuery seconds-since-epoch into String matching JSON export. Thread-safe and
- * immutable.
- */
- private static final DateTimeFormatter DATE_AND_SECONDS_FORMATTER =
- DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withZoneUTC();
- // Package private for BigQueryTableRowIterator to use.
- static String formatTimestamp(String timestamp) {
- // timestamp is in "seconds since epoch" format, with scientific notation.
- // e.g., "1.45206229112345E9" to mean "2016-01-06 06:38:11.123456 UTC".
- // Separate into seconds and microseconds.
- double timestampDoubleMicros = Double.parseDouble(timestamp) * 1000000;
- long timestampMicros = (long) timestampDoubleMicros;
- long seconds = timestampMicros / 1000000;
- int micros = (int) (timestampMicros % 1000000);
- String dayAndTime = DATE_AND_SECONDS_FORMATTER.print(seconds * 1000);
-
- // No sub-second component.
- if (micros == 0) {
- return String.format("%s UTC", dayAndTime);
- }
-
- // Sub-second component.
- int digits = 6;
- int subsecond = micros;
- while (subsecond % 10 == 0) {
- digits--;
- subsecond /= 10;
- }
- String formatString = String.format("%%0%dd", digits);
- String fractionalSeconds = String.format(formatString, subsecond);
- return String.format("%s.%s UTC", dayAndTime, fractionalSeconds);
- }
-
- /**
- * Utility function to convert from an Avro {@link GenericRecord} to a BigQuery {@link TableRow}.
- *
- * See <a href="https://cloud.google.com/bigquery/exporting-data-from-bigquery#config">
- * "Avro format"</a> for more information.
- */
- public static TableRow convertGenericRecordToTableRow(GenericRecord record, TableSchema schema) {
- return convertGenericRecordToTableRow(record, schema.getFields());
- }
-
- private static TableRow convertGenericRecordToTableRow(
- GenericRecord record, List<TableFieldSchema> fields) {
- TableRow row = new TableRow();
- for (TableFieldSchema subSchema : fields) {
- // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the name field
- // is required, so it may not be null.
- Field field = record.getSchema().getField(subSchema.getName());
- Object convertedValue =
- getTypedCellValue(field.schema(), subSchema, record.get(field.name()));
- if (convertedValue != null) {
- // To match the JSON files exported by BigQuery, do not include null values in the output.
- row.set(field.name(), convertedValue);
- }
- }
- return row;
- }
-
- @Nullable
- private static Object getTypedCellValue(Schema schema, TableFieldSchema fieldSchema, Object v) {
- // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the mode field
- // is optional (and so it may be null), but defaults to "NULLABLE".
- String mode = firstNonNull(fieldSchema.getMode(), "NULLABLE");
- switch (mode) {
- case "REQUIRED":
- return convertRequiredField(schema.getType(), fieldSchema, v);
- case "REPEATED":
- return convertRepeatedField(schema, fieldSchema, v);
- case "NULLABLE":
- return convertNullableField(schema, fieldSchema, v);
- default:
- throw new UnsupportedOperationException(
- "Parsing a field with BigQuery field schema mode " + fieldSchema.getMode());
- }
- }
-
- private static List<Object> convertRepeatedField(
- Schema schema, TableFieldSchema fieldSchema, Object v) {
- Type arrayType = schema.getType();
- verify(
- arrayType == Type.ARRAY,
- "BigQuery REPEATED field %s should be Avro ARRAY, not %s",
- fieldSchema.getName(),
- arrayType);
- // REPEATED fields are represented as Avro arrays.
- if (v == null) {
- // Handle the case of an empty repeated field.
- return ImmutableList.of();
- }
- @SuppressWarnings("unchecked")
- List<Object> elements = (List<Object>) v;
- ImmutableList.Builder<Object> values = ImmutableList.builder();
- Type elementType = schema.getElementType().getType();
- for (Object element : elements) {
- values.add(convertRequiredField(elementType, fieldSchema, element));
- }
- return values.build();
- }
-
- private static Object convertRequiredField(
- Type avroType, TableFieldSchema fieldSchema, Object v) {
- // REQUIRED fields are represented as the corresponding Avro types. For example, a BigQuery
- // INTEGER type maps to an Avro LONG type.
- checkNotNull(v, "REQUIRED field %s should not be null", fieldSchema.getName());
- ImmutableMap<String, Type> fieldMap =
- ImmutableMap.<String, Type>builder()
- .put("STRING", Type.STRING)
- .put("INTEGER", Type.LONG)
- .put("FLOAT", Type.DOUBLE)
- .put("BOOLEAN", Type.BOOLEAN)
- .put("TIMESTAMP", Type.LONG)
- .put("RECORD", Type.RECORD)
- .build();
- // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the type field
- // is required, so it may not be null.
- String bqType = fieldSchema.getType();
- Type expectedAvroType = fieldMap.get(bqType);
- verify(
- avroType == expectedAvroType,
- "Expected Avro schema type %s, not %s, for BigQuery %s field %s",
- expectedAvroType,
- avroType,
- bqType,
- fieldSchema.getName());
- switch (fieldSchema.getType()) {
- case "STRING":
- // Avro will use a CharSequence to represent String objects, but it may not always use
- // java.lang.String; for example, it may prefer org.apache.avro.util.Utf8.
- verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass());
- return v.toString();
- case "INTEGER":
- verify(v instanceof Long, "Expected Long, got %s", v.getClass());
- return ((Long) v).toString();
- case "FLOAT":
- verify(v instanceof Double, "Expected Double, got %s", v.getClass());
- return v;
- case "BOOLEAN":
- verify(v instanceof Boolean, "Expected Boolean, got %s", v.getClass());
- return v;
- case "TIMESTAMP":
- // TIMESTAMP data types are represented as Avro LONG types. They are converted back to
- // Strings with variable-precision (up to six digits) to match the JSON files export
- // by BigQuery.
- verify(v instanceof Long, "Expected Long, got %s", v.getClass());
- Double doubleValue = ((Long) v) / 1000000.0;
- return formatTimestamp(doubleValue.toString());
- case "RECORD":
- verify(v instanceof GenericRecord, "Expected GenericRecord, got %s", v.getClass());
- return convertGenericRecordToTableRow((GenericRecord) v, fieldSchema.getFields());
- default:
- throw new UnsupportedOperationException(
- String.format(
- "Unexpected BigQuery field schema type %s for field named %s",
- fieldSchema.getType(),
- fieldSchema.getName()));
- }
- }
-
- @Nullable
- private static Object convertNullableField(
- Schema avroSchema, TableFieldSchema fieldSchema, Object v) {
- // NULLABLE fields are represented as an Avro Union of the corresponding type and "null".
- verify(
- avroSchema.getType() == Type.UNION,
- "Expected Avro schema type UNION, not %s, for BigQuery NULLABLE field %s",
- avroSchema.getType(),
- fieldSchema.getName());
- List<Schema> unionTypes = avroSchema.getTypes();
- verify(
- unionTypes.size() == 2,
- "BigQuery NULLABLE field %s should be an Avro UNION of NULL and another type, not %s",
- fieldSchema.getName(),
- unionTypes);
-
- if (v == null) {
- return null;
- }
-
- Type firstType = unionTypes.get(0).getType();
- if (!firstType.equals(Type.NULL)) {
- return convertRequiredField(firstType, fieldSchema, v);
- }
- return convertRequiredField(unionTypes.get(1).getType(), fieldSchema, v);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BaseExecutionContext.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BaseExecutionContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BaseExecutionContext.java
deleted file mode 100644
index 6a0ccf3..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BaseExecutionContext.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.util.common.worker.StateSampler;
-import com.google.cloud.dataflow.sdk.util.state.StateInternals;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Base class for implementations of {@link ExecutionContext}.
- *
- * <p>A concrete subclass should implement {@link #createStepContext} to create the appropriate
- * {@link StepContext} implementation. Any {@code StepContext} created will
- * be cached for the lifetime of this {@link ExecutionContext}.
- *
- * <p>BaseExecutionContext is generic to allow implementing subclasses to return a concrete subclass
- * of {@link StepContext} from {@link #getOrCreateStepContext(String, String, StateSampler)} and
- * {@link #getAllStepContexts()} without forcing each subclass to override the method, e.g.
- * <pre>
- * @Override
- * StreamingModeExecutionContext.StepContext getOrCreateStepContext(...) {
- * return (StreamingModeExecutionContext.StepContext) super.getOrCreateStepContext(...);
- * }
- * </pre>
- *
- * <p>When a subclass of {@code BaseExecutionContext} has been downcast, the return types of
- * {@link #createStepContext(String, String, StateSampler)},
- * {@link #getOrCreateStepContext(String, String, StateSampler}, and {@link #getAllStepContexts()}
- * will be appropriately specialized.
- */
-public abstract class BaseExecutionContext<T extends ExecutionContext.StepContext>
- implements ExecutionContext {
-
- private Map<String, T> cachedStepContexts = new HashMap<>();
-
- /**
- * Implementations should override this to create the specific type
- * of {@link StepContext} they need.
- */
- protected abstract T createStepContext(
- String stepName, String transformName, StateSampler stateSampler);
-
-
- /**
- * Returns the {@link StepContext} associated with the given step.
- */
- @Override
- public T getOrCreateStepContext(
- String stepName, String transformName, StateSampler stateSampler) {
- T context = cachedStepContexts.get(stepName);
- if (context == null) {
- context = createStepContext(stepName, transformName, stateSampler);
- cachedStepContexts.put(stepName, context);
- }
- return context;
- }
-
- /**
- * Returns a collection view of all of the {@link StepContext}s.
- */
- @Override
- public Collection<? extends T> getAllStepContexts() {
- return Collections.unmodifiableCollection(cachedStepContexts.values());
- }
-
- /**
- * Hook for subclasses to implement that will be called whenever
- * {@link com.google.cloud.dataflow.sdk.transforms.DoFn.Context#output}
- * is called.
- */
- @Override
- public void noteOutput(WindowedValue<?> output) {}
-
- /**
- * Hook for subclasses to implement that will be called whenever
- * {@link com.google.cloud.dataflow.sdk.transforms.DoFn.Context#sideOutput}
- * is called.
- */
- @Override
- public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) {}
-
- /**
- * Base class for implementations of {@link ExecutionContext.StepContext}.
- *
- * <p>To complete a concrete subclass, implement {@link #timerInternals} and
- * {@link #stateInternals}.
- */
- public abstract static class StepContext implements ExecutionContext.StepContext {
- private final ExecutionContext executionContext;
- private final String stepName;
- private final String transformName;
-
- public StepContext(ExecutionContext executionContext, String stepName, String transformName) {
- this.executionContext = executionContext;
- this.stepName = stepName;
- this.transformName = transformName;
- }
-
- @Override
- public String getStepName() {
- return stepName;
- }
-
- @Override
- public String getTransformName() {
- return transformName;
- }
-
- @Override
- public void noteOutput(WindowedValue<?> output) {
- executionContext.noteOutput(output);
- }
-
- @Override
- public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) {
- executionContext.noteSideOutput(tag, output);
- }
-
- @Override
- public <T, W extends BoundedWindow> void writePCollectionViewData(
- TupleTag<?> tag,
- Iterable<WindowedValue<T>> data, Coder<Iterable<WindowedValue<T>>> dataCoder,
- W window, Coder<W> windowCoder) throws IOException {
- throw new UnsupportedOperationException("Not implemented.");
- }
-
- @Override
- public abstract StateInternals<?> stateInternals();
-
- @Override
- public abstract TimerInternals timerInternals();
- }
-}