You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:29 UTC
[05/67] [partial] incubator-beam git commit: Directory reorganization
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/StateSampler.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/StateSampler.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/StateSampler.java
deleted file mode 100644
index 00d3b3b..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/StateSampler.java
+++ /dev/null
@@ -1,365 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- ******************************************************************************/
-
-package com.google.cloud.dataflow.sdk.util.common.worker;
-
-import com.google.cloud.dataflow.sdk.util.common.Counter;
-import com.google.cloud.dataflow.sdk.util.common.CounterSet;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.concurrent.ThreadSafe;
-
-/**
- * A StateSampler object may be used to obtain an approximate
- * breakdown of the time spent by an execution context in various
- * states, as a fraction of the total time. The sampling is taken at
- * regular intervals, with adjustment for scheduling delay.
- */
-@ThreadSafe
-public class StateSampler implements AutoCloseable {
-
- /** Different kinds of states. */
- public enum StateKind {
- /** IO, user code, etc. */
- USER,
- /** Reading/writing from/to shuffle service, etc. */
- FRAMEWORK
- }
-
- public static final long DEFAULT_SAMPLING_PERIOD_MS = 200;
-
- private final String prefix;
- private final CounterSet.AddCounterMutator counterSetMutator;
-
- /** Array of counters indexed by their state. */
- private ArrayList<Counter<Long>> countersByState = new ArrayList<>();
-
- /** Map of state name to state. */
- private Map<String, Integer> statesByName = new HashMap<>();
-
- /** Map of state id to kind. */
- private Map<Integer, StateKind> kindsByState = new HashMap<>();
-
- /** The current state. */
- private volatile int currentState;
-
- /** Special value of {@code currentState} that means we do not sample. */
- public static final int DO_NOT_SAMPLE = -1;
-
- /**
- * A counter that increments with each state transition. May be used
- * to detect a context being stuck in a state for some amount of
- * time.
- */
- private volatile long stateTransitionCount;
-
- /**
- * The timestamp (in nanoseconds) corresponding to the last time the
- * state was sampled (and recorded).
- */
- private long stateTimestampNs = 0;
-
- /** Using a fixed number of timers for all StateSampler objects. */
- private static final int NUM_EXECUTOR_THREADS = 16;
-
- private static final ScheduledExecutorService executorService =
- Executors.newScheduledThreadPool(NUM_EXECUTOR_THREADS,
- new ThreadFactoryBuilder().setDaemon(true).build());
-
- private Random rand = new Random();
-
- private List<SamplingCallback> callbacks = new ArrayList<>();
-
- private ScheduledFuture<?> invocationTriggerFuture = null;
-
- private ScheduledFuture<?> invocationFuture = null;
-
- /**
- * Constructs a new {@link StateSampler} that can be used to obtain
- * an approximate breakdown of the time spent by an execution
- * context in various states, as a fraction of the total time.
- *
- * @param prefix the prefix of the counter names for the states
- * @param counterSetMutator the {@link CounterSet.AddCounterMutator}
- * used to create a counter for each distinct state
- * @param samplingPeriodMs the sampling period in milliseconds
- */
- public StateSampler(String prefix,
- CounterSet.AddCounterMutator counterSetMutator,
- final long samplingPeriodMs) {
- this.prefix = prefix;
- this.counterSetMutator = counterSetMutator;
- currentState = DO_NOT_SAMPLE;
- scheduleSampling(samplingPeriodMs);
- }
-
- /**
- * Constructs a new {@link StateSampler} that can be used to obtain
- * an approximate breakdown of the time spent by an execution
- * context in various states, as a fraction of the total time.
- *
- * @param prefix the prefix of the counter names for the states
- * @param counterSetMutator the {@link CounterSet.AddCounterMutator}
- * used to create a counter for each distinct state
- */
- public StateSampler(String prefix,
- CounterSet.AddCounterMutator counterSetMutator) {
- this(prefix, counterSetMutator, DEFAULT_SAMPLING_PERIOD_MS);
- }
-
- /**
- * Called by the constructor to schedule sampling at the given period.
- *
- * <p>Should not be overridden by sub-classes unless they want to change
- * or disable the automatic sampling of state.
- */
- protected void scheduleSampling(final long samplingPeriodMs) {
- // Here "stratified sampling" is used, which makes sure that there's 1 uniformly chosen sampled
- // point in every bucket of samplingPeriodMs, to prevent pathological behavior in case some
- // states happen to occur at a similar period.
- // The current implementation uses a fixed-rate timer with a period samplingPeriodMs as a
- // trampoline to a one-shot random timer which fires with a random delay within
- // samplingPeriodMs.
- stateTimestampNs = System.nanoTime();
- invocationTriggerFuture =
- executorService.scheduleAtFixedRate(
- new Runnable() {
- @Override
- public void run() {
- long delay = rand.nextInt((int) samplingPeriodMs);
- synchronized (StateSampler.this) {
- if (invocationFuture != null) {
- invocationFuture.cancel(false);
- }
- invocationFuture =
- executorService.schedule(
- new Runnable() {
- @Override
- public void run() {
- StateSampler.this.run();
- }
- },
- delay,
- TimeUnit.MILLISECONDS);
- }
- }
- },
- 0,
- samplingPeriodMs,
- TimeUnit.MILLISECONDS);
- }
-
- public synchronized void run() {
- long startTimestampNs = System.nanoTime();
- int state = currentState;
- if (state != DO_NOT_SAMPLE) {
- StateKind kind = null;
- long elapsedMs = TimeUnit.NANOSECONDS.toMillis(startTimestampNs - stateTimestampNs);
- kind = kindsByState.get(state);
- countersByState.get(state).addValue(elapsedMs);
- // Invoke all callbacks.
- for (SamplingCallback c : callbacks) {
- c.run(state, kind, elapsedMs);
- }
- }
- stateTimestampNs = startTimestampNs;
- }
-
- @Override
- public synchronized void close() {
- currentState = DO_NOT_SAMPLE;
- if (invocationTriggerFuture != null) {
- invocationTriggerFuture.cancel(false);
- }
- if (invocationFuture != null) {
- invocationFuture.cancel(false);
- }
- }
-
- /**
- * Returns the state associated with a name; creating a new state if
- * necessary. Using states instead of state names during state
- * transitions is done for efficiency.
- *
- * @name the name for the state
- * @kind kind of the state, see {#code StateKind}
- * @return the state associated with the state name
- */
- public int stateForName(String name, StateKind kind) {
- if (name.isEmpty()) {
- return DO_NOT_SAMPLE;
- }
-
- synchronized (this) {
- Integer state = statesByName.get(name);
- if (state == null) {
- String counterName = prefix + name + "-msecs";
- Counter<Long> counter = counterSetMutator.addCounter(
- Counter.longs(counterName, Counter.AggregationKind.SUM));
- state = countersByState.size();
- statesByName.put(name, state);
- countersByState.add(counter);
- kindsByState.put(state, kind);
- }
- StateKind originalKind = kindsByState.get(state);
- if (originalKind != kind) {
- throw new IllegalArgumentException(
- "for state named " + name
- + ", requested kind " + kind + " different from the original kind " + originalKind);
- }
- return state;
- }
- }
-
- /**
- * An internal class for representing StateSampler information
- * typically used for debugging.
- */
- public static class StateSamplerInfo {
- public final String state;
- public final Long transitionCount;
- public final Long stateDurationMillis;
-
- public StateSamplerInfo(String state, Long transitionCount,
- Long stateDurationMillis) {
- this.state = state;
- this.transitionCount = transitionCount;
- this.stateDurationMillis = stateDurationMillis;
- }
- }
-
- /**
- * Returns information about the current state of this state sampler
- * into a {@link StateSamplerInfo} object, or null if sampling is
- * not turned on.
- *
- * @return information about this state sampler or null if sampling is off
- */
- public synchronized StateSamplerInfo getInfo() {
- return currentState == DO_NOT_SAMPLE ? null
- : new StateSamplerInfo(countersByState.get(currentState).getName(),
- stateTransitionCount, null);
- }
-
- /**
- * Returns the current state of this state sampler.
- */
- public int getCurrentState() {
- return currentState;
- }
-
- /**
- * Sets the current thread state.
- *
- * @param state the new state to transition to
- * @return the previous state
- */
- public int setState(int state) {
- // Updates to stateTransitionCount are always done by the same
- // thread, making the non-atomic volatile update below safe. The
- // count is updated first to avoid incorrectly attributing
- // stuckness occuring in an old state to the new state.
- long previousStateTransitionCount = this.stateTransitionCount;
- this.stateTransitionCount = previousStateTransitionCount + 1;
- int previousState = currentState;
- currentState = state;
- return previousState;
- }
-
- /**
- * Sets the current thread state.
- *
- * @param name the name of the new state to transition to
- * @param kind kind of the new state
- * @return the previous state
- */
- public int setState(String name, StateKind kind) {
- return setState(stateForName(name, kind));
- }
-
- /**
- * Returns an AutoCloseable {@link ScopedState} that will perform a
- * state transition to the given state, and will automatically reset
- * the state to the prior state upon closing.
- *
- * @param state the new state to transition to
- * @return a {@link ScopedState} that automatically resets the state
- * to the prior state
- */
- public ScopedState scopedState(int state) {
- return new ScopedState(this, setState(state));
- }
-
- /**
- * Add a callback to the sampler.
- * The callbacks will be executed sequentially upon {@link StateSampler#run}.
- */
- public synchronized void addSamplingCallback(SamplingCallback callback) {
- callbacks.add(callback);
- }
-
- /** Get the counter prefix associated with this sampler. */
- public String getPrefix() {
- return prefix;
- }
-
- /**
- * A nested class that is used to account for states and state
- * transitions based on lexical scopes.
- *
- * <p>Thread-safe.
- */
- public class ScopedState implements AutoCloseable {
- private StateSampler sampler;
- private int previousState;
-
- private ScopedState(StateSampler sampler, int previousState) {
- this.sampler = sampler;
- this.previousState = previousState;
- }
-
- @Override
- public void close() {
- sampler.setState(previousState);
- }
- }
-
- /**
- * Callbacks which supposed to be called sequentially upon {@link StateSampler#run}.
- * They should be registered via {@link #addSamplingCallback}.
- */
- public static interface SamplingCallback {
- /**
- * The entrance method of the callback, it is called in {@link StateSampler#run},
- * once per sample. This method should be thread safe.
- *
- * @param state The state of the StateSampler at the time of sample.
- * @param kind The kind associated with the state, see {@link StateKind}.
- * @param elapsedMs Milliseconds since last sample.
- */
- public void run(int state, StateKind kind, long elapsedMs);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/package-info.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/package-info.java
deleted file mode 100644
index c3da9ed..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/** Defines utilities used to implement the harness that runs user code. **/
-package com.google.cloud.dataflow.sdk.util.common.worker;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/gcsfs/GcsPath.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/gcsfs/GcsPath.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/gcsfs/GcsPath.java
deleted file mode 100644
index f72ba4c..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/gcsfs/GcsPath.java
+++ /dev/null
@@ -1,619 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util.gcsfs;
-
-import com.google.api.services.storage.model.StorageObject;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.file.FileSystem;
-import java.nio.file.LinkOption;
-import java.nio.file.Path;
-import java.nio.file.WatchEvent;
-import java.nio.file.WatchKey;
-import java.nio.file.WatchService;
-import java.util.Iterator;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-/**
- * Implements the Java NIO {@link Path} API for Google Cloud Storage paths.
- *
- * <p>GcsPath uses a slash ('/') as a directory separator. Below is
- * a summary of how slashes are treated:
- * <ul>
- * <li> A GCS bucket may not contain a slash. An object may contain zero or
- * more slashes.
- * <li> A trailing slash always indicates a directory, which is compliant
- * with POSIX.1-2008.
- * <li> Slashes separate components of a path. Empty components are allowed,
- * these are represented as repeated slashes. An empty component always
- * refers to a directory, and always ends in a slash.
- * <li> {@link #getParent()}} always returns a path ending in a slash, as the
- * parent of a GcsPath is always a directory.
- * <li> Use {@link #resolve(String)} to append elements to a GcsPath -- this
- * applies the rules consistently and is highly recommended over any
- * custom string concatenation.
- * </ul>
- *
- * <p>GcsPath treats all GCS objects and buckets as belonging to the same
- * filesystem, so the root of a GcsPath is the GcsPath bucket="", object="".
- *
- * <p>Relative paths are not associated with any bucket. This matches common
- * treatment of Path in which relative paths can be constructed from one
- * filesystem and appended to another filesystem.
- *
- * @see <a href=
- * "http://docs.oracle.com/javase/tutorial/essential/io/pathOps.html"
- * >Java Tutorials: Path Operations</a>
- */
-public class GcsPath implements Path {
-
- public static final String SCHEME = "gs";
-
- /**
- * Creates a GcsPath from a URI.
- *
- * <p>The URI must be in the form {@code gs://[bucket]/[path]}, and may not
- * contain a port, user info, a query, or a fragment.
- */
- public static GcsPath fromUri(URI uri) {
- Preconditions.checkArgument(uri.getScheme().equalsIgnoreCase(SCHEME),
- "URI: %s is not a GCS URI", uri);
- Preconditions.checkArgument(uri.getPort() == -1,
- "GCS URI may not specify port: %s (%i)", uri, uri.getPort());
- Preconditions.checkArgument(
- Strings.isNullOrEmpty(uri.getUserInfo()),
- "GCS URI may not specify userInfo: %s (%s)", uri, uri.getUserInfo());
- Preconditions.checkArgument(
- Strings.isNullOrEmpty(uri.getQuery()),
- "GCS URI may not specify query: %s (%s)", uri, uri.getQuery());
- Preconditions.checkArgument(
- Strings.isNullOrEmpty(uri.getFragment()),
- "GCS URI may not specify fragment: %s (%s)", uri, uri.getFragment());
-
- return fromUri(uri.toString());
- }
-
- /**
- * Pattern that is used to parse a GCS URL.
- *
- * <p>This is used to separate the components. Verification is handled
- * separately.
- */
- public static final Pattern GCS_URI =
- Pattern.compile("(?<SCHEME>[^:]+)://(?<BUCKET>[^/]+)(/(?<OBJECT>.*))?");
-
- /**
- * Creates a GcsPath from a URI in string form.
- *
- * <p>This does not use URI parsing, which means it may accept patterns that
- * the URI parser would not accept.
- */
- public static GcsPath fromUri(String uri) {
- Matcher m = GCS_URI.matcher(uri);
- Preconditions.checkArgument(m.matches(), "Invalid GCS URI: %s", uri);
-
- Preconditions.checkArgument(m.group("SCHEME").equalsIgnoreCase(SCHEME),
- "URI: %s is not a GCS URI", uri);
- return new GcsPath(null, m.group("BUCKET"), m.group("OBJECT"));
- }
-
- /**
- * Pattern that is used to parse a GCS resource name.
- */
- private static final Pattern GCS_RESOURCE_NAME =
- Pattern.compile("storage.googleapis.com/(?<BUCKET>[^/]+)(/(?<OBJECT>.*))?");
-
- /**
- * Creates a GcsPath from a OnePlatform resource name in string form.
- */
- public static GcsPath fromResourceName(String name) {
- Matcher m = GCS_RESOURCE_NAME.matcher(name);
- Preconditions.checkArgument(m.matches(), "Invalid GCS resource name: %s", name);
-
- return new GcsPath(null, m.group("BUCKET"), m.group("OBJECT"));
- }
-
- /**
- * Creates a GcsPath from a {@linkplain StorageObject}.
- */
- public static GcsPath fromObject(StorageObject object) {
- return new GcsPath(null, object.getBucket(), object.getName());
- }
-
- /**
- * Creates a GcsPath from bucket and object components.
- *
- * <p>A GcsPath without a bucket name is treated as a relative path, which
- * is a path component with no linkage to the root element. This is similar
- * to a Unix path that does not begin with the root marker (a slash).
- * GCS has different naming constraints and APIs for working with buckets and
- * objects, so these two concepts are kept separate to avoid accidental
- * attempts to treat objects as buckets, or vice versa, as much as possible.
- *
- * <p>A GcsPath without an object name is a bucket reference.
- * A bucket is always a directory, which could be used to lookup or add
- * files to a bucket, but could not be opened as a file.
- *
- * <p>A GcsPath containing neither bucket or object names is treated as
- * the root of the GCS filesystem. A listing on the root element would return
- * the buckets available to the user.
- *
- * <p>If {@code null} is passed as either parameter, it is converted to an
- * empty string internally for consistency. There is no distinction between
- * an empty string and a {@code null}, as neither are allowed by GCS.
- *
- * @param bucket a GCS bucket name, or none ({@code null} or an empty string)
- * if the object is not associated with a bucket
- * (e.g. relative paths or the root node).
- * @param object a GCS object path, or none ({@code null} or an empty string)
- * for no object.
- */
- public static GcsPath fromComponents(@Nullable String bucket,
- @Nullable String object) {
- return new GcsPath(null, bucket, object);
- }
-
- @Nullable
- private FileSystem fs;
- @Nonnull
- private final String bucket;
- @Nonnull
- private final String object;
-
- /**
- * Constructs a GcsPath.
- *
- * @param fs the associated FileSystem, if any
- * @param bucket the associated bucket, or none ({@code null} or an empty
- * string) for a relative path component
- * @param object the object, which is a fully-qualified object name if bucket
- * was also provided, or none ({@code null} or an empty string)
- * for no object
- * @throws java.lang.IllegalArgumentException if the bucket of object names
- * are invalid.
- */
- public GcsPath(@Nullable FileSystem fs,
- @Nullable String bucket,
- @Nullable String object) {
- if (bucket == null) {
- bucket = "";
- }
- Preconditions.checkArgument(!bucket.contains("/"),
- "GCS bucket may not contain a slash");
- Preconditions
- .checkArgument(bucket.isEmpty()
- || bucket.matches("[a-z0-9][-_a-z0-9.]+[a-z0-9]"),
- "GCS bucket names must contain only lowercase letters, numbers, "
- + "dashes (-), underscores (_), and dots (.). Bucket names "
- + "must start and end with a number or letter. "
- + "See https://developers.google.com/storage/docs/bucketnaming "
- + "for more details. Bucket name: " + bucket);
-
- if (object == null) {
- object = "";
- }
- Preconditions.checkArgument(
- object.indexOf('\n') < 0 && object.indexOf('\r') < 0,
- "GCS object names must not contain Carriage Return or "
- + "Line Feed characters.");
-
- this.fs = fs;
- this.bucket = bucket;
- this.object = object;
- }
-
- /**
- * Returns the bucket name associated with this GCS path, or an empty string
- * if this is a relative path component.
- */
- public String getBucket() {
- return bucket;
- }
-
- /**
- * Returns the object name associated with this GCS path, or an empty string
- * if no object is specified.
- */
- public String getObject() {
- return object;
- }
-
- public void setFileSystem(FileSystem fs) {
- this.fs = fs;
- }
-
- @Override
- public FileSystem getFileSystem() {
- return fs;
- }
-
- // Absolute paths are those that have a bucket and the root path.
- @Override
- public boolean isAbsolute() {
- return !bucket.isEmpty() || object.isEmpty();
- }
-
- @Override
- public GcsPath getRoot() {
- return new GcsPath(fs, "", "");
- }
-
- @Override
- public GcsPath getFileName() {
- throw new UnsupportedOperationException();
- }
-
- /**
- * Returns the <em>parent path</em>, or {@code null} if this path does not
- * have a parent.
- *
- * <p>Returns a path that ends in '/', as the parent path always refers to
- * a directory.
- */
- @Override
- public GcsPath getParent() {
- if (bucket.isEmpty() && object.isEmpty()) {
- // The root path has no parent, by definition.
- return null;
- }
-
- if (object.isEmpty()) {
- // A GCS bucket. All buckets come from a common root.
- return getRoot();
- }
-
- // Skip last character, in case it is a trailing slash.
- int i = object.lastIndexOf('/', object.length() - 2);
- if (i <= 0) {
- if (bucket.isEmpty()) {
- // Relative paths are not attached to the root node.
- return null;
- }
- return new GcsPath(fs, bucket, "");
- }
-
- // Retain trailing slash.
- return new GcsPath(fs, bucket, object.substring(0, i + 1));
- }
-
- @Override
- public int getNameCount() {
- int count = bucket.isEmpty() ? 0 : 1;
- if (object.isEmpty()) {
- return count;
- }
-
- // Add another for each separator found.
- int index = -1;
- while ((index = object.indexOf('/', index + 1)) != -1) {
- count++;
- }
-
- return object.endsWith("/") ? count : count + 1;
- }
-
- @Override
- public GcsPath getName(int count) {
- Preconditions.checkArgument(count >= 0);
-
- Iterator<Path> iterator = iterator();
- for (int i = 0; i < count; ++i) {
- Preconditions.checkArgument(iterator.hasNext());
- iterator.next();
- }
-
- Preconditions.checkArgument(iterator.hasNext());
- return (GcsPath) iterator.next();
- }
-
- @Override
- public GcsPath subpath(int beginIndex, int endIndex) {
- Preconditions.checkArgument(beginIndex >= 0);
- Preconditions.checkArgument(endIndex > beginIndex);
-
- Iterator<Path> iterator = iterator();
- for (int i = 0; i < beginIndex; ++i) {
- Preconditions.checkArgument(iterator.hasNext());
- iterator.next();
- }
-
- GcsPath path = null;
- while (beginIndex < endIndex) {
- Preconditions.checkArgument(iterator.hasNext());
- if (path == null) {
- path = (GcsPath) iterator.next();
- } else {
- path = path.resolve(iterator.next());
- }
- ++beginIndex;
- }
-
- return path;
- }
-
- @Override
- public boolean startsWith(Path other) {
- if (other instanceof GcsPath) {
- GcsPath gcsPath = (GcsPath) other;
- return startsWith(gcsPath.bucketAndObject());
- } else {
- return startsWith(other.toString());
- }
- }
-
- @Override
- public boolean startsWith(String prefix) {
- return bucketAndObject().startsWith(prefix);
- }
-
- @Override
- public boolean endsWith(Path other) {
- if (other instanceof GcsPath) {
- GcsPath gcsPath = (GcsPath) other;
- return endsWith(gcsPath.bucketAndObject());
- } else {
- return endsWith(other.toString());
- }
- }
-
- @Override
- public boolean endsWith(String suffix) {
- return bucketAndObject().endsWith(suffix);
- }
-
- // TODO: support "." and ".." path components?
- @Override
- public GcsPath normalize() {
- return this;
- }
-
- @Override
- public GcsPath resolve(Path other) {
- if (other instanceof GcsPath) {
- GcsPath path = (GcsPath) other;
- if (path.isAbsolute()) {
- return path;
- } else {
- return resolve(path.getObject());
- }
- } else {
- return resolve(other.toString());
- }
- }
-
- @Override
- public GcsPath resolve(String other) {
- if (bucket.isEmpty() && object.isEmpty()) {
- // Resolve on a root path is equivalent to looking up a bucket and object.
- other = SCHEME + "://" + other;
- }
-
- if (other.startsWith(SCHEME + "://")) {
- GcsPath path = GcsPath.fromUri(other);
- path.setFileSystem(getFileSystem());
- return path;
- }
-
- if (other.isEmpty()) {
- // An empty component MUST refer to a directory.
- other = "/";
- }
-
- if (object.isEmpty()) {
- return new GcsPath(fs, bucket, other);
- } else if (object.endsWith("/")) {
- return new GcsPath(fs, bucket, object + other);
- } else {
- return new GcsPath(fs, bucket, object + "/" + other);
- }
- }
-
- @Override
- public Path resolveSibling(Path other) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Path resolveSibling(String other) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Path relativize(Path other) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public GcsPath toAbsolutePath() {
- return this;
- }
-
- @Override
- public GcsPath toRealPath(LinkOption... options) throws IOException {
- return this;
- }
-
- @Override
- public File toFile() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public WatchKey register(WatchService watcher, WatchEvent.Kind<?>[] events,
- WatchEvent.Modifier... modifiers) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public WatchKey register(WatchService watcher, WatchEvent.Kind<?>... events)
- throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Iterator<Path> iterator() {
- return new NameIterator(fs, !bucket.isEmpty(), bucketAndObject());
- }
-
- private static class NameIterator implements Iterator<Path> {
- private final FileSystem fs;
- private boolean fullPath;
- private String name;
-
- NameIterator(FileSystem fs, boolean fullPath, String name) {
- this.fs = fs;
- this.fullPath = fullPath;
- this.name = name;
- }
-
- @Override
- public boolean hasNext() {
- return !Strings.isNullOrEmpty(name);
- }
-
- @Override
- public GcsPath next() {
- int i = name.indexOf('/');
- String component;
- if (i >= 0) {
- component = name.substring(0, i);
- name = name.substring(i + 1);
- } else {
- component = name;
- name = null;
- }
- if (fullPath) {
- fullPath = false;
- return new GcsPath(fs, component, "");
- } else {
- // Relative paths have no bucket.
- return new GcsPath(fs, "", component);
- }
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- }
-
- @Override
- public int compareTo(Path other) {
- if (!(other instanceof GcsPath)) {
- throw new ClassCastException();
- }
-
- GcsPath path = (GcsPath) other;
- int b = bucket.compareTo(path.bucket);
- if (b != 0) {
- return b;
- }
-
- // Compare a component at a time, so that the separator char doesn't
- // get compared against component contents. Eg, "a/b" < "a-1/b".
- Iterator<Path> left = iterator();
- Iterator<Path> right = path.iterator();
-
- while (left.hasNext() && right.hasNext()) {
- String leftStr = left.next().toString();
- String rightStr = right.next().toString();
- int c = leftStr.compareTo(rightStr);
- if (c != 0) {
- return c;
- }
- }
-
- if (!left.hasNext() && !right.hasNext()) {
- return 0;
- } else {
- return left.hasNext() ? 1 : -1;
- }
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- GcsPath paths = (GcsPath) o;
- return bucket.equals(paths.bucket) && object.equals(paths.object);
- }
-
- @Override
- public int hashCode() {
- int result = bucket.hashCode();
- result = 31 * result + object.hashCode();
- return result;
- }
-
- @Override
- public String toString() {
- if (!isAbsolute()) {
- return object;
- }
- StringBuilder sb = new StringBuilder();
- sb.append(SCHEME)
- .append("://");
- if (!bucket.isEmpty()) {
- sb.append(bucket)
- .append('/');
- }
- sb.append(object);
- return sb.toString();
- }
-
- // TODO: Consider using resource names for all GCS paths used by the SDK.
- public String toResourceName() {
- StringBuilder sb = new StringBuilder();
- sb.append("storage.googleapis.com/");
- if (!bucket.isEmpty()) {
- sb.append(bucket).append('/');
- }
- sb.append(object);
- return sb.toString();
- }
-
- @Override
- public URI toUri() {
- try {
- return new URI(SCHEME, "//" + bucketAndObject(), null);
- } catch (URISyntaxException e) {
- throw new RuntimeException("Unable to create URI for GCS path " + this);
- }
- }
-
- private String bucketAndObject() {
- if (bucket.isEmpty()) {
- return object;
- } else {
- return bucket + "/" + object;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/gcsfs/package-info.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/gcsfs/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/gcsfs/package-info.java
deleted file mode 100644
index 2f57938..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/gcsfs/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/** Defines utilities used to interact with Google Cloud Storage. **/
-package com.google.cloud.dataflow.sdk.util.gcsfs;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/package-info.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/package-info.java
deleted file mode 100644
index c92adab..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/** Defines utilities used by the Dataflow SDK. **/
-package com.google.cloud.dataflow.sdk.util;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/AccumulatorCombiningState.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/AccumulatorCombiningState.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/AccumulatorCombiningState.java
deleted file mode 100644
index 0d78b13..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/AccumulatorCombiningState.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-
-/**
- * State for a single value that is managed by a {@link CombineFn}. This is an internal extension
- * to {@link CombiningState} that includes the {@code AccumT} type.
- *
- * @param <InputT> the type of values added to the state
- * @param <AccumT> the type of accumulator
- * @param <OutputT> the type of value extracted from the state
- */
-public interface AccumulatorCombiningState<InputT, AccumT, OutputT>
- extends CombiningState<InputT, OutputT> {
-
- /**
- * Read the merged accumulator for this combining value. It is implied that reading the
- * state involes reading the accumulator, so {@link #readLater} is sufficient to prefetch for
- * this.
- */
- AccumT getAccum();
-
- /**
- * Add an accumulator to this combining value. Depending on implementation this may immediately
- * merge it with the previous accumulator, or may buffer this accumulator for a future merge.
- */
- void addAccum(AccumT accum);
-
- /**
- * Merge the given accumulators according to the underlying combiner.
- */
- AccumT mergeAccumulators(Iterable<AccumT> accumulators);
-
- @Override
- AccumulatorCombiningState<InputT, AccumT, OutputT> readLater();
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/BagState.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/BagState.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/BagState.java
deleted file mode 100644
index 363e480..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/BagState.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-/**
- * State containing a bag values. Items can be added to the bag and the contents read out.
- *
- * @param <T> The type of elements in the bag.
- */
-public interface BagState<T> extends CombiningState<T, Iterable<T>> {
- @Override
- BagState<T> readLater();
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/CombiningState.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/CombiningState.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/CombiningState.java
deleted file mode 100644
index 673bebb..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/CombiningState.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-
-/**
- * State that combines multiple {@code InputT} values using a {@link CombineFn} to produce a single
- * {@code OutputT} value.
- *
- * @param <InputT> the type of values added to the state
- * @param <OutputT> the type of value extracted from the state
- */
-public interface CombiningState<InputT, OutputT> extends ReadableState<OutputT>, State {
- /**
- * Add a value to the buffer.
- */
- void add(InputT value);
-
- /**
- * Return true if this state is empty.
- */
- ReadableState<Boolean> isEmpty();
-
- @Override
- CombiningState<InputT, OutputT> readLater();
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/CopyOnAccessInMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/CopyOnAccessInMemoryStateInternals.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/CopyOnAccessInMemoryStateInternals.java
deleted file mode 100644
index 3683b74..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/CopyOnAccessInMemoryStateInternals.java
+++ /dev/null
@@ -1,454 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn;
-import com.google.cloud.dataflow.sdk.util.CombineFnUtil;
-import com.google.cloud.dataflow.sdk.util.state.InMemoryStateInternals.InMemoryState;
-import com.google.cloud.dataflow.sdk.util.state.StateTag.StateBinder;
-import com.google.common.base.Optional;
-import com.google.common.collect.Iterables;
-
-import org.joda.time.Instant;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-/**
- * {@link StateInternals} built on top of an underlying {@link StateTable} that contains instances
- * of {@link InMemoryState}. Whenever state that exists in the underlying {@link StateTable} is
- * accessed, an independent copy will be created within this table.
- */
-public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> {
- private final K key;
- private final CopyOnAccessInMemoryStateTable<K> table;
-
- /**
- * Creates a new {@link CopyOnAccessInMemoryStateInternals} with the underlying (possibly null)
- * StateInternals.
- */
- public static <K> CopyOnAccessInMemoryStateInternals<K> withUnderlying(
- K key, @Nullable CopyOnAccessInMemoryStateInternals<K> underlying) {
- return new CopyOnAccessInMemoryStateInternals<K>(key, underlying);
- }
-
- private CopyOnAccessInMemoryStateInternals(
- K key, CopyOnAccessInMemoryStateInternals<K> underlying) {
- this.key = key;
- table =
- new CopyOnAccessInMemoryStateTable<K>(key, underlying == null ? null : underlying.table);
- }
-
- /**
- * Ensures this {@link CopyOnAccessInMemoryStateInternals} is complete. Other copies of state for
- * the same Step and Key may be discarded after invoking this method.
- *
- * <p>For each {@link StateNamespace}, for each {@link StateTag address} in that namespace that
- * has not been bound in this {@link CopyOnAccessInMemoryStateInternals}, put a reference to that
- * state within this {@link StateInternals}.
- *
- * <p>Additionally, stores the {@link WatermarkHoldState} with the earliest time bound in the
- * state table after the commit is completed, enabling calls to
- * {@link #getEarliestWatermarkHold()}.
- *
- * @return this table
- */
- public CopyOnAccessInMemoryStateInternals<K> commit() {
- table.commit();
- return this;
- }
-
- /**
- * Gets the earliest Watermark Hold present in this table.
- *
- * <p>Must be called after this state has been committed. Will throw an
- * {@link IllegalStateException} if the state has not been committed.
- */
- public Instant getEarliestWatermarkHold() {
- // After commit, the watermark hold is always present, but may be
- // BoundedWindow#TIMESTAMP_MAX_VALUE if there is no hold set.
- checkState(
- table.earliestWatermarkHold.isPresent(),
- "Can't get the earliest watermark hold in a %s before it is committed",
- getClass().getSimpleName());
- return table.earliestWatermarkHold.get();
- }
-
- @Override
- public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) {
- return state(namespace, address, StateContexts.nullContext());
- }
-
- @Override
- public <T extends State> T state(
- StateNamespace namespace, StateTag<? super K, T> address, StateContext<?> c) {
- return table.get(namespace, address, c);
- }
-
- @Override
- public K getKey() {
- return key;
- }
-
- public boolean isEmpty() {
- return Iterables.isEmpty(table.values());
- }
-
- /**
- * A {@link StateTable} that, when a value is retrieved with
- * {@link StateTable#get(StateNamespace, StateTag)}, first attempts to obtain a copy of existing
- * {@link State} from an underlying {@link StateTable}.
- */
- private static class CopyOnAccessInMemoryStateTable<K> extends StateTable<K> {
- private final K key;
- private Optional<StateTable<K>> underlying;
-
- /**
- * The StateBinderFactory currently in use by this {@link CopyOnAccessInMemoryStateTable}.
- *
- * <p>There are three {@link StateBinderFactory} implementations used by the {@link
- * CopyOnAccessInMemoryStateTable}.
- * <ul>
- * <li>The default {@link StateBinderFactory} is a {@link CopyOnBindBinderFactory}, allowing
- * the table to copy any existing {@link State} values to this {@link StateTable} from the
- * underlying table when accessed, at which point mutations will not be visible to the
- * underlying table - effectively a "Copy by Value" binder.</li>
- * <li>During the execution of the {@link #commit()} method, this is a
- * {@link ReadThroughBinderFactory}, which copies the references to the existing
- * {@link State} objects to this {@link StateTable}.</li>
- * <li>After the execution of the {@link #commit()} method, this is an
- * instance of {@link InMemoryStateBinderFactory}, which constructs new instances of state
- * when a {@link StateTag} is bound.</li>
- * </ul>
- */
- private StateBinderFactory<K> binderFactory;
-
- /**
- * The earliest watermark hold in this table.
- */
- private Optional<Instant> earliestWatermarkHold;
-
- public CopyOnAccessInMemoryStateTable(K key, StateTable<K> underlying) {
- this.key = key;
- this.underlying = Optional.fromNullable(underlying);
- binderFactory = new CopyOnBindBinderFactory<>(key, this.underlying);
- earliestWatermarkHold = Optional.absent();
- }
-
- /**
- * Copies all values in the underlying table to this table, then discards the underlying table.
- *
- * <p>If there is an underlying table, this replaces the existing
- * {@link CopyOnBindBinderFactory} with a {@link ReadThroughBinderFactory}, then reads all of
- * the values in the existing table, binding the state values to this table. The old StateTable
- * should be discarded after the call to {@link #commit()}.
- *
- * <p>After copying all of the existing values, replace the binder factory with an instance of
- * {@link InMemoryStateBinderFactory} to construct new values, since all existing values
- * are bound in this {@link StateTable table} and this table represents the canonical state.
- */
- private void commit() {
- Instant earliestHold = getEarliestWatermarkHold();
- if (underlying.isPresent()) {
- ReadThroughBinderFactory<K> readThroughBinder =
- new ReadThroughBinderFactory<>(underlying.get());
- binderFactory = readThroughBinder;
- Instant earliestUnderlyingHold = readThroughBinder.readThroughAndGetEarliestHold(this);
- if (earliestUnderlyingHold.isBefore(earliestHold)) {
- earliestHold = earliestUnderlyingHold;
- }
- }
- earliestWatermarkHold = Optional.of(earliestHold);
- clearEmpty();
- binderFactory = new InMemoryStateBinderFactory<>(key);
- underlying = Optional.absent();
- }
-
- /**
- * Get the earliest watermark hold in this table. Ignores the contents of any underlying table.
- */
- private Instant getEarliestWatermarkHold() {
- Instant earliest = BoundedWindow.TIMESTAMP_MAX_VALUE;
- for (State existingState : this.values()) {
- if (existingState instanceof WatermarkHoldState) {
- Instant hold = ((WatermarkHoldState<?>) existingState).read();
- if (hold != null && hold.isBefore(earliest)) {
- earliest = hold;
- }
- }
- }
- return earliest;
- }
-
- /**
- * Clear all empty {@link StateNamespace StateNamespaces} from this table. If all states are
- * empty, clear the entire table.
- *
- * <p>Because {@link InMemoryState} is not removed from the {@link StateTable} after it is
- * cleared, in case contents are modified after being cleared, the table must be explicitly
- * checked to ensure that it contains state and removed if not (otherwise we may never use
- * the table again).
- */
- private void clearEmpty() {
- Collection<StateNamespace> emptyNamespaces = new HashSet<>(this.getNamespacesInUse());
- for (StateNamespace namespace : this.getNamespacesInUse()) {
- for (State existingState : this.getTagsInUse(namespace).values()) {
- if (!((InMemoryState<?>) existingState).isCleared()) {
- emptyNamespaces.remove(namespace);
- break;
- }
- }
- }
- for (StateNamespace empty : emptyNamespaces) {
- this.clearNamespace(empty);
- }
- }
-
- @Override
- protected StateBinder<K> binderForNamespace(final StateNamespace namespace, StateContext<?> c) {
- return binderFactory.forNamespace(namespace, c);
- }
-
- private static interface StateBinderFactory<K> {
- StateBinder<K> forNamespace(StateNamespace namespace, StateContext<?> c);
- }
-
- /**
- * {@link StateBinderFactory} that creates a copy of any existing state when the state is bound.
- */
- private static class CopyOnBindBinderFactory<K> implements StateBinderFactory<K> {
- private final K key;
- private final Optional<StateTable<K>> underlying;
-
- public CopyOnBindBinderFactory(K key, Optional<StateTable<K>> underlying) {
- this.key = key;
- this.underlying = underlying;
- }
-
- private boolean containedInUnderlying(StateNamespace namespace, StateTag<? super K, ?> tag) {
- return underlying.isPresent() && underlying.get().isNamespaceInUse(namespace)
- && underlying.get().getTagsInUse(namespace).containsKey(tag);
- }
-
- @Override
- public StateBinder<K> forNamespace(final StateNamespace namespace, final StateContext<?> c) {
- return new StateBinder<K>() {
- @Override
- public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
- StateTag<? super K, WatermarkHoldState<W>> address,
- OutputTimeFn<? super W> outputTimeFn) {
- if (containedInUnderlying(namespace, address)) {
- @SuppressWarnings("unchecked")
- InMemoryState<? extends WatermarkHoldState<W>> existingState =
- (InMemoryStateInternals.InMemoryState<? extends WatermarkHoldState<W>>)
- underlying.get().get(namespace, address, c);
- return existingState.copy();
- } else {
- return new InMemoryStateInternals.InMemoryWatermarkHold<>(
- outputTimeFn);
- }
- }
-
- @Override
- public <T> ValueState<T> bindValue(
- StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
- if (containedInUnderlying(namespace, address)) {
- @SuppressWarnings("unchecked")
- InMemoryState<? extends ValueState<T>> existingState =
- (InMemoryStateInternals.InMemoryState<? extends ValueState<T>>)
- underlying.get().get(namespace, address, c);
- return existingState.copy();
- } else {
- return new InMemoryStateInternals.InMemoryValue<>();
- }
- }
-
- @Override
- public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
- bindCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
- if (containedInUnderlying(namespace, address)) {
- @SuppressWarnings("unchecked")
- InMemoryState<? extends AccumulatorCombiningState<InputT, AccumT, OutputT>>
- existingState = (
- InMemoryStateInternals
- .InMemoryState<? extends AccumulatorCombiningState<InputT, AccumT,
- OutputT>>) underlying.get().get(namespace, address, c);
- return existingState.copy();
- } else {
- return new InMemoryStateInternals.InMemoryCombiningValue<>(
- key, combineFn.asKeyedFn());
- }
- }
-
- @Override
- public <T> BagState<T> bindBag(
- StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
- if (containedInUnderlying(namespace, address)) {
- @SuppressWarnings("unchecked")
- InMemoryState<? extends BagState<T>> existingState =
- (InMemoryStateInternals.InMemoryState<? extends BagState<T>>)
- underlying.get().get(namespace, address, c);
- return existingState.copy();
- } else {
- return new InMemoryStateInternals.InMemoryBag<>();
- }
- }
-
- @Override
- public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
- bindKeyedCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
- if (containedInUnderlying(namespace, address)) {
- @SuppressWarnings("unchecked")
- InMemoryState<? extends AccumulatorCombiningState<InputT, AccumT, OutputT>>
- existingState = (
- InMemoryStateInternals
- .InMemoryState<? extends AccumulatorCombiningState<InputT, AccumT,
- OutputT>>) underlying.get().get(namespace, address, c);
- return existingState.copy();
- } else {
- return new InMemoryStateInternals.InMemoryCombiningValue<>(key, combineFn);
- }
- }
-
- @Override
- public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
- bindKeyedCombiningValueWithContext(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
- return bindKeyedCombiningValue(
- address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
- }
- };
- }
- }
-
- /**
- * {@link StateBinderFactory} that reads directly from the underlying table. Used during calls
- * to {@link CopyOnAccessInMemoryStateTable#commit()} to read all values from
- * the underlying table.
- */
- private static class ReadThroughBinderFactory<K> implements StateBinderFactory<K> {
- private final StateTable<K> underlying;
-
- public ReadThroughBinderFactory(StateTable<K> underlying) {
- this.underlying = underlying;
- }
-
- public Instant readThroughAndGetEarliestHold(StateTable<K> readTo) {
- Instant earliestHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
- for (StateNamespace namespace : underlying.getNamespacesInUse()) {
- for (Map.Entry<StateTag<? super K, ?>, ? extends State> existingState :
- underlying.getTagsInUse(namespace).entrySet()) {
- if (!((InMemoryState<?>) existingState.getValue()).isCleared()) {
- // Only read through non-cleared values to ensure that completed windows are
- // eventually discarded, and remember the earliest watermark hold from among those
- // values.
- State state =
- readTo.get(namespace, existingState.getKey(), StateContexts.nullContext());
- if (state instanceof WatermarkHoldState) {
- Instant hold = ((WatermarkHoldState<?>) state).read();
- if (hold != null && hold.isBefore(earliestHold)) {
- earliestHold = hold;
- }
- }
- }
- }
- }
- return earliestHold;
- }
-
- @Override
- public StateBinder<K> forNamespace(final StateNamespace namespace, final StateContext<?> c) {
- return new StateBinder<K>() {
- @Override
- public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
- StateTag<? super K, WatermarkHoldState<W>> address,
- OutputTimeFn<? super W> outputTimeFn) {
- return underlying.get(namespace, address, c);
- }
-
- @Override
- public <T> ValueState<T> bindValue(
- StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
- return underlying.get(namespace, address, c);
- }
-
- @Override
- public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
- bindCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
- return underlying.get(namespace, address, c);
- }
-
- @Override
- public <T> BagState<T> bindBag(
- StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
- return underlying.get(namespace, address, c);
- }
-
- @Override
- public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
- bindKeyedCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
- return underlying.get(namespace, address, c);
- }
-
- @Override
- public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
- bindKeyedCombiningValueWithContext(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
- return bindKeyedCombiningValue(
- address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
- }
- };
- }
- }
-
- private static class InMemoryStateBinderFactory<K> implements StateBinderFactory<K> {
- private final K key;
-
- public InMemoryStateBinderFactory(K key) {
- this.key = key;
- }
-
- @Override
- public StateBinder<K> forNamespace(StateNamespace namespace, StateContext<?> c) {
- return new InMemoryStateInternals.InMemoryStateBinder<>(key, c);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/InMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/InMemoryStateInternals.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/InMemoryStateInternals.java
deleted file mode 100644
index 8404801..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/InMemoryStateInternals.java
+++ /dev/null
@@ -1,414 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn;
-import com.google.cloud.dataflow.sdk.util.CombineFnUtil;
-import com.google.cloud.dataflow.sdk.util.state.StateTag.StateBinder;
-
-import org.joda.time.Instant;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-
-import javax.annotation.Nullable;
-
-/**
- * In-memory implementation of {@link StateInternals}. Used in {@code BatchModeExecutionContext}
- * and for running tests that need state.
- */
-@Experimental(Kind.STATE)
-public class InMemoryStateInternals<K> implements StateInternals<K> {
-
- public static <K> InMemoryStateInternals<K> forKey(K key) {
- return new InMemoryStateInternals<>(key);
- }
-
- private final K key;
-
- protected InMemoryStateInternals(K key) {
- this.key = key;
- }
-
- @Override
- public K getKey() {
- return key;
- }
-
- interface InMemoryState<T extends InMemoryState<T>> {
- boolean isCleared();
- T copy();
- }
-
- protected final StateTable<K> inMemoryState = new StateTable<K>() {
- @Override
- protected StateBinder<K> binderForNamespace(StateNamespace namespace, StateContext<?> c) {
- return new InMemoryStateBinder<K>(key, c);
- }
- };
-
- public void clear() {
- inMemoryState.clear();
- }
-
- /**
- * Return true if the given state is empty. This is used by the test framework to make sure
- * that the state has been properly cleaned up.
- */
- protected boolean isEmptyForTesting(State state) {
- return ((InMemoryState<?>) state).isCleared();
- }
-
- @Override
- public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) {
- return inMemoryState.get(namespace, address, StateContexts.nullContext());
- }
-
- @Override
- public <T extends State> T state(
- StateNamespace namespace, StateTag<? super K, T> address, final StateContext<?> c) {
- return inMemoryState.get(namespace, address, c);
- }
-
- /**
- * A {@link StateBinder} that returns In Memory {@link State} objects.
- */
- static class InMemoryStateBinder<K> implements StateBinder<K> {
- private final K key;
- private final StateContext<?> c;
-
- InMemoryStateBinder(K key, StateContext<?> c) {
- this.key = key;
- this.c = c;
- }
-
- @Override
- public <T> ValueState<T> bindValue(
- StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
- return new InMemoryValue<T>();
- }
-
- @Override
- public <T> BagState<T> bindBag(
- final StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
- return new InMemoryBag<T>();
- }
-
- @Override
- public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
- bindCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- final CombineFn<InputT, AccumT, OutputT> combineFn) {
- return new InMemoryCombiningValue<K, InputT, AccumT, OutputT>(key, combineFn.<K>asKeyedFn());
- }
-
- @Override
- public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
- StateTag<? super K, WatermarkHoldState<W>> address,
- OutputTimeFn<? super W> outputTimeFn) {
- return new InMemoryWatermarkHold<W>(outputTimeFn);
- }
-
- @Override
- public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
- bindKeyedCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
- return new InMemoryCombiningValue<K, InputT, AccumT, OutputT>(key, combineFn);
- }
-
- @Override
- public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
- bindKeyedCombiningValueWithContext(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
- return bindKeyedCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
- }
- }
-
- static final class InMemoryValue<T> implements ValueState<T>, InMemoryState<InMemoryValue<T>> {
- private boolean isCleared = true;
- private T value = null;
-
- @Override
- public void clear() {
- // Even though we're clearing we can't remove this from the in-memory state map, since
- // other users may already have a handle on this Value.
- value = null;
- isCleared = true;
- }
-
- @Override
- public InMemoryValue<T> readLater() {
- return this;
- }
-
- @Override
- public T read() {
- return value;
- }
-
- @Override
- public void write(T input) {
- isCleared = false;
- this.value = input;
- }
-
- @Override
- public InMemoryValue<T> copy() {
- InMemoryValue<T> that = new InMemoryValue<>();
- if (!this.isCleared) {
- that.isCleared = this.isCleared;
- that.value = this.value;
- }
- return that;
- }
-
- @Override
- public boolean isCleared() {
- return isCleared;
- }
- }
-
- static final class InMemoryWatermarkHold<W extends BoundedWindow>
- implements WatermarkHoldState<W>, InMemoryState<InMemoryWatermarkHold<W>> {
-
- private final OutputTimeFn<? super W> outputTimeFn;
-
- @Nullable
- private Instant combinedHold = null;
-
- public InMemoryWatermarkHold(OutputTimeFn<? super W> outputTimeFn) {
- this.outputTimeFn = outputTimeFn;
- }
-
- @Override
- public InMemoryWatermarkHold<W> readLater() {
- return this;
- }
-
- @Override
- public void clear() {
- // Even though we're clearing we can't remove this from the in-memory state map, since
- // other users may already have a handle on this WatermarkBagInternal.
- combinedHold = null;
- }
-
- @Override
- public Instant read() {
- return combinedHold;
- }
-
- @Override
- public void add(Instant outputTime) {
- combinedHold = combinedHold == null ? outputTime
- : outputTimeFn.combine(combinedHold, outputTime);
- }
-
- @Override
- public boolean isCleared() {
- return combinedHold == null;
- }
-
- @Override
- public ReadableState<Boolean> isEmpty() {
- return new ReadableState<Boolean>() {
- @Override
- public ReadableState<Boolean> readLater() {
- return this;
- }
- @Override
- public Boolean read() {
- return combinedHold == null;
- }
- };
- }
-
- @Override
- public OutputTimeFn<? super W> getOutputTimeFn() {
- return outputTimeFn;
- }
-
- @Override
- public String toString() {
- return Objects.toString(combinedHold);
- }
-
- @Override
- public InMemoryWatermarkHold<W> copy() {
- InMemoryWatermarkHold<W> that =
- new InMemoryWatermarkHold<>(outputTimeFn);
- that.combinedHold = this.combinedHold;
- return that;
- }
- }
-
- static final class InMemoryCombiningValue<K, InputT, AccumT, OutputT>
- implements AccumulatorCombiningState<InputT, AccumT, OutputT>,
- InMemoryState<InMemoryCombiningValue<K, InputT, AccumT, OutputT>> {
- private final K key;
- private boolean isCleared = true;
- private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
- private AccumT accum;
-
- InMemoryCombiningValue(
- K key, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
- this.key = key;
- this.combineFn = combineFn;
- accum = combineFn.createAccumulator(key);
- }
-
- @Override
- public InMemoryCombiningValue<K, InputT, AccumT, OutputT> readLater() {
- return this;
- }
-
- @Override
- public void clear() {
- // Even though we're clearing we can't remove this from the in-memory state map, since
- // other users may already have a handle on this CombiningValue.
- accum = combineFn.createAccumulator(key);
- isCleared = true;
- }
-
- @Override
- public OutputT read() {
- return combineFn.extractOutput(key, accum);
- }
-
- @Override
- public void add(InputT input) {
- isCleared = false;
- accum = combineFn.addInput(key, accum, input);
- }
-
- @Override
- public AccumT getAccum() {
- return accum;
- }
-
- @Override
- public ReadableState<Boolean> isEmpty() {
- return new ReadableState<Boolean>() {
- @Override
- public ReadableState<Boolean> readLater() {
- return this;
- }
- @Override
- public Boolean read() {
- return isCleared;
- }
- };
- }
-
- @Override
- public void addAccum(AccumT accum) {
- isCleared = false;
- this.accum = combineFn.mergeAccumulators(key, Arrays.asList(this.accum, accum));
- }
-
- @Override
- public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
- return combineFn.mergeAccumulators(key, accumulators);
- }
-
- @Override
- public boolean isCleared() {
- return isCleared;
- }
-
- @Override
- public InMemoryCombiningValue<K, InputT, AccumT, OutputT> copy() {
- InMemoryCombiningValue<K, InputT, AccumT, OutputT> that =
- new InMemoryCombiningValue<>(key, combineFn);
- if (!this.isCleared) {
- that.isCleared = this.isCleared;
- that.addAccum(accum);
- }
- return that;
- }
- }
-
- static final class InMemoryBag<T> implements BagState<T>, InMemoryState<InMemoryBag<T>> {
- private List<T> contents = new ArrayList<>();
-
- @Override
- public void clear() {
- // Even though we're clearing we can't remove this from the in-memory state map, since
- // other users may already have a handle on this Bag.
- // The result of get/read below must be stable for the lifetime of the bundle within which it
- // was generated. In batch and direct runners the bundle lifetime can be
- // greater than the window lifetime, in which case this method can be called while
- // the result is still in use. We protect against this by hot-swapping instead of
- // clearing the contents.
- contents = new ArrayList<>();
- }
-
- @Override
- public InMemoryBag<T> readLater() {
- return this;
- }
-
- @Override
- public Iterable<T> read() {
- return contents;
- }
-
- @Override
- public void add(T input) {
- contents.add(input);
- }
-
- @Override
- public boolean isCleared() {
- return contents.isEmpty();
- }
-
- @Override
- public ReadableState<Boolean> isEmpty() {
- return new ReadableState<Boolean>() {
- @Override
- public ReadableState<Boolean> readLater() {
- return this;
- }
-
- @Override
- public Boolean read() {
- return contents.isEmpty();
- }
- };
- }
-
- @Override
- public InMemoryBag<T> copy() {
- InMemoryBag<T> that = new InMemoryBag<>();
- that.contents.addAll(this.contents);
- return that;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/MergingStateAccessor.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/MergingStateAccessor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/MergingStateAccessor.java
deleted file mode 100644
index 40211d7..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/MergingStateAccessor.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util.state;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-
-import java.util.Map;
-
-/**
- * Interface for accessing persistent state while windows are merging.
- *
- * <p>For internal use only.
- */
-@Experimental(Kind.STATE)
-public interface MergingStateAccessor<K, W extends BoundedWindow>
- extends StateAccessor<K> {
- /**
- * Analogous to {@link #access}, but returned as a map from each window which is
- * about to be merged to the corresponding state. Only includes windows which
- * are known to have state.
- */
- <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
- StateTag<? super K, StateT> address);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/ReadableState.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/ReadableState.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/ReadableState.java
deleted file mode 100644
index 8f690a3..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/ReadableState.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind;
-
-/**
- * A {@code StateContents} is produced by the read methods on all {@link State} objects.
- * Calling {@link #read} returns the associated value.
- *
- * <p>This class is similar to {@link java.util.concurrent.Future}, but each invocation of
- * {@link #read} need not return the same value.
- *
- * <p>Getting the {@code StateContents} from a read method indicates the desire to eventually
- * read a value. Depending on the runner this may or may not immediately start the read.
- *
- * @param <T> The type of value returned by {@link #read}.
- */
-@Experimental(Kind.STATE)
-public interface ReadableState<T> {
- /**
- * Read the current value, blocking until it is available.
- *
- * <p>If there will be many calls to {@link #read} for different state in short succession,
- * you should first call {@link #readLater} for all of them so the reads can potentially be
- * batched (depending on the underlying {@link StateInternals} implementation}.
- */
- T read();
-
- /**
- * Indicate that the value will be read later.
- *
- * <p>This allows a {@link StateInternals} implementation to start an asynchronous prefetch or
- * to include this state in the next batch of reads.
- *
- * @return this for convenient chaining
- */
- ReadableState<T> readLater();
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/State.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/State.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/State.java
deleted file mode 100644
index 0cef786..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/State.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-/**
- * Base interface for all state locations.
- *
- * <p>Specific types of state add appropriate accessors for reading and writing values, see
- * {@link ValueState}, {@link BagState}, and {@link CombiningState}.
- */
-public interface State {
-
- /**
- * Clear out the state location.
- */
- void clear();
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateAccessor.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateAccessor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateAccessor.java
deleted file mode 100644
index 6cfbecf..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateAccessor.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util.state;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind;
-
-/**
- * Interface for accessing a {@link StateTag} in the current context.
- *
- * <p>For internal use only.
- */
-@Experimental(Kind.STATE)
-public interface StateAccessor<K> {
- /**
- * Access the storage for the given {@code address} in the current window.
- *
- * <p>Never accounts for merged windows. When windows are merged, any state accessed via
- * this method must be eagerly combined and written into the result window.
- */
- <StateT extends State> StateT access(StateTag<? super K, StateT> address);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateContext.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateContext.java
deleted file mode 100644
index 96387d8..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateContext.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.state;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-
-/**
- * Information accessible the state API.
- */
-public interface StateContext<W extends BoundedWindow> {
- /**
- * Returns the {@code PipelineOptions} specified with the
- * {@link com.google.cloud.dataflow.sdk.runners.PipelineRunner}.
- */
- public abstract PipelineOptions getPipelineOptions();
-
- /**
- * Returns the value of the side input for the corresponding state window.
- */
- public abstract <T> T sideInput(PCollectionView<T> view);
-
- /**
- * Returns the window corresponding to the state.
- */
- public abstract W window();
-}