You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:37 UTC
[13/67] [partial] incubator-beam git commit: Directory reorganization
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CounterAggregator.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CounterAggregator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CounterAggregator.java
deleted file mode 100644
index 824825f..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CounterAggregator.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- ******************************************************************************/
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.transforms.Max;
-import com.google.cloud.dataflow.sdk.transforms.Min;
-import com.google.cloud.dataflow.sdk.transforms.Sum;
-import com.google.cloud.dataflow.sdk.util.common.Counter;
-import com.google.cloud.dataflow.sdk.util.common.CounterProvider;
-import com.google.cloud.dataflow.sdk.util.common.CounterSet;
-
-/**
- * An implementation of the {@code Aggregator} interface that uses a
- * {@link Counter} as the underlying representation. Supports {@link CombineFn}s
- * from the {@link Sum}, {@link Min} and {@link Max} classes.
- *
- * @param <InputT> the type of input values
- * @param <AccumT> the type of accumulator values
- * @param <OutputT> the type of output value
- */
-public class CounterAggregator<InputT, AccumT, OutputT> implements Aggregator<InputT, OutputT> {
-
- private final Counter<InputT> counter;
- private final CombineFn<InputT, AccumT, OutputT> combiner;
-
- /**
- * Constructs a new aggregator with the given name and aggregation logic
- * specified in the CombineFn argument. The underlying counter is
- * automatically added into the provided CounterSet.
- *
- * <p>If a counter with the same name already exists, it will be reused, as
- * long as it has the same type.
- */
- public CounterAggregator(String name, CombineFn<? super InputT, AccumT, OutputT> combiner,
- CounterSet.AddCounterMutator addCounterMutator) {
- // Safe contravariant cast
- this(constructCounter(name, combiner), addCounterMutator,
- (CombineFn<InputT, AccumT, OutputT>) combiner);
- }
-
- private CounterAggregator(Counter<InputT> counter,
- CounterSet.AddCounterMutator addCounterMutator,
- CombineFn<InputT, AccumT, OutputT> combiner) {
- try {
- this.counter = addCounterMutator.addCounter(counter);
- } catch (IllegalArgumentException ex) {
- throw new IllegalArgumentException(
- "aggregator's name collides with an existing aggregator "
- + "or system-provided counter of an incompatible type");
- }
- this.combiner = combiner;
- }
-
- private static <T> Counter<T> constructCounter(String name,
- CombineFn<? super T, ?, ?> combiner) {
- if (combiner instanceof CounterProvider) {
- @SuppressWarnings("unchecked")
- CounterProvider<T> counterProvider = (CounterProvider<T>) combiner;
- return counterProvider.getCounter(name);
- } else {
- throw new IllegalArgumentException("unsupported combiner in Aggregator: "
- + combiner.getClass().getName());
- }
- }
-
- @Override
- public void addValue(InputT value) {
- counter.addValue(value);
- }
-
- @Override
- public String getName() {
- return counter.getName();
- }
-
- @Override
- public CombineFn<InputT, ?, OutputT> getCombineFn() {
- return combiner;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CredentialFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CredentialFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CredentialFactory.java
deleted file mode 100644
index 4913a1e..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CredentialFactory.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.api.client.auth.oauth2.Credential;
-
-import java.io.IOException;
-import java.security.GeneralSecurityException;
-
-/**
- * Construct an oauth credential to be used by the SDK and the SDK workers.
- */
-public interface CredentialFactory {
- public Credential getCredential() throws IOException, GeneralSecurityException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Credentials.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Credentials.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Credentials.java
deleted file mode 100644
index 671b131..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Credentials.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.api.client.auth.oauth2.Credential;
-import com.google.api.client.extensions.java6.auth.oauth2.AbstractPromptReceiver;
-import com.google.api.client.extensions.java6.auth.oauth2.AuthorizationCodeInstalledApp;
-import com.google.api.client.googleapis.auth.oauth2.GoogleAuthorizationCodeFlow;
-import com.google.api.client.googleapis.auth.oauth2.GoogleClientSecrets;
-import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
-import com.google.api.client.googleapis.auth.oauth2.GoogleOAuthConstants;
-import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
-import com.google.api.client.http.GenericUrl;
-import com.google.api.client.http.HttpTransport;
-import com.google.api.client.json.JsonFactory;
-import com.google.api.client.json.jackson2.JacksonFactory;
-import com.google.api.client.util.store.FileDataStoreFactory;
-import com.google.cloud.dataflow.sdk.options.GcpOptions;
-import com.google.common.base.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.security.GeneralSecurityException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
-/**
- * Provides support for loading credentials.
- */
-public class Credentials {
-
- private static final Logger LOG = LoggerFactory.getLogger(Credentials.class);
-
- /**
- * OAuth 2.0 scopes used by a local worker (not on GCE).
- * The scope cloud-platform provides access to all Cloud Platform resources.
- * cloud-platform isn't sufficient yet for talking to datastore so we request
- * those resources separately.
- *
- * <p>Note that trusted scope relationships don't apply to OAuth tokens, so for
- * services we access directly (GCS) as opposed to through the backend
- * (BigQuery, GCE), we need to explicitly request that scope.
- */
- private static final List<String> SCOPES = Arrays.asList(
- "https://www.googleapis.com/auth/cloud-platform",
- "https://www.googleapis.com/auth/devstorage.full_control",
- "https://www.googleapis.com/auth/userinfo.email",
- "https://www.googleapis.com/auth/datastore");
-
- private static class PromptReceiver extends AbstractPromptReceiver {
- @Override
- public String getRedirectUri() {
- return GoogleOAuthConstants.OOB_REDIRECT_URI;
- }
- }
-
- /**
- * Initializes OAuth2 credentials.
- *
- * <p>This can use 3 different mechanisms for obtaining a credential:
- * <ol>
- * <li>
- * It can fetch the
- * <a href="https://developers.google.com/accounts/docs/application-default-credentials">
- * application default credentials</a>.
- * </li>
- * <li>
- * The user can specify a client secrets file and go through the OAuth2
- * webflow. The credential will then be cached in the user's home
- * directory for reuse. Provide the property "secrets_file" to use this
- * mechanism.
- * </li>
- * <li>
- * The user can specify a file containing a service account.
- * Provide the properties "service_account_keyfile" and
- * "service_account_name" to use this mechanism.
- * </li>
- * </ol>
- * The default mechanism is to use the
- * <a href="https://developers.google.com/accounts/docs/application-default-credentials">
- * application default credentials</a>. The other options can be used by providing the
- * corresponding properties.
- */
- public static Credential getCredential(GcpOptions options)
- throws IOException, GeneralSecurityException {
- String keyFile = options.getServiceAccountKeyfile();
- String accountName = options.getServiceAccountName();
-
- if (keyFile != null && accountName != null) {
- try {
- return getCredentialFromFile(keyFile, accountName, SCOPES);
- } catch (GeneralSecurityException e) {
- throw new IOException("Unable to obtain credentials from file", e);
- }
- }
-
- if (options.getSecretsFile() != null) {
- return getCredentialFromClientSecrets(options, SCOPES);
- }
-
- try {
- return GoogleCredential.getApplicationDefault().createScoped(SCOPES);
- } catch (IOException e) {
- throw new RuntimeException("Unable to get application default credentials. Please see "
- + "https://developers.google.com/accounts/docs/application-default-credentials "
- + "for details on how to specify credentials. This version of the SDK is "
- + "dependent on the gcloud core component version 2015.02.05 or newer to "
- + "be able to get credentials from the currently authorized user via gcloud auth.", e);
- }
- }
-
- /**
- * Loads OAuth2 credential from a local file.
- */
- private static Credential getCredentialFromFile(
- String keyFile, String accountId, Collection<String> scopes)
- throws IOException, GeneralSecurityException {
- GoogleCredential credential = new GoogleCredential.Builder()
- .setTransport(Transport.getTransport())
- .setJsonFactory(Transport.getJsonFactory())
- .setServiceAccountId(accountId)
- .setServiceAccountScopes(scopes)
- .setServiceAccountPrivateKeyFromP12File(new File(keyFile))
- .build();
-
- LOG.info("Created credential from file {}", keyFile);
- return credential;
- }
-
- /**
- * Loads OAuth2 credential from client secrets, which may require an
- * interactive authorization prompt.
- */
- private static Credential getCredentialFromClientSecrets(
- GcpOptions options, Collection<String> scopes)
- throws IOException, GeneralSecurityException {
- String clientSecretsFile = options.getSecretsFile();
-
- Preconditions.checkArgument(clientSecretsFile != null);
- HttpTransport httpTransport = GoogleNetHttpTransport.newTrustedTransport();
-
- JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();
- GoogleClientSecrets clientSecrets;
-
- try {
- clientSecrets = GoogleClientSecrets.load(jsonFactory,
- new FileReader(clientSecretsFile));
- } catch (IOException e) {
- throw new RuntimeException(
- "Could not read the client secrets from file: " + clientSecretsFile,
- e);
- }
-
- FileDataStoreFactory dataStoreFactory =
- new FileDataStoreFactory(new java.io.File(options.getCredentialDir()));
-
- GoogleAuthorizationCodeFlow flow = new GoogleAuthorizationCodeFlow.Builder(
- httpTransport, jsonFactory, clientSecrets, scopes)
- .setDataStoreFactory(dataStoreFactory)
- .setTokenServerUrl(new GenericUrl(options.getTokenServerUrl()))
- .setAuthorizationServerEncodedUrl(options.getAuthorizationServerEncodedUrl())
- .build();
-
- // The credentialId identifies the credential if we're using a persistent
- // credential store.
- Credential credential =
- new AuthorizationCodeInstalledApp(flow, new PromptReceiver())
- .authorize(options.getCredentialId());
-
- LOG.info("Got credential from client secret");
- return credential;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowPathValidator.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowPathValidator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowPathValidator.java
deleted file mode 100644
index cfb120c..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowPathValidator.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
-import com.google.common.base.Preconditions;
-
-import java.io.IOException;
-
-/**
- * GCP implementation of {@link PathValidator}. Only GCS paths are allowed.
- */
-public class DataflowPathValidator implements PathValidator {
-
- private DataflowPipelineOptions dataflowOptions;
-
- DataflowPathValidator(DataflowPipelineOptions options) {
- this.dataflowOptions = options;
- }
-
- public static DataflowPathValidator fromOptions(PipelineOptions options) {
- return new DataflowPathValidator(options.as(DataflowPipelineOptions.class));
- }
-
- /**
- * Validates the the input GCS path is accessible and that the path
- * is well formed.
- */
- @Override
- public String validateInputFilePatternSupported(String filepattern) {
- GcsPath gcsPath = getGcsPath(filepattern);
- Preconditions.checkArgument(
- dataflowOptions.getGcsUtil().isGcsPatternSupported(gcsPath.getObject()));
- String returnValue = verifyPath(filepattern);
- verifyPathIsAccessible(filepattern, "Could not find file %s");
- return returnValue;
- }
-
- /**
- * Validates the the output GCS path is accessible and that the path
- * is well formed.
- */
- @Override
- public String validateOutputFilePrefixSupported(String filePrefix) {
- String returnValue = verifyPath(filePrefix);
- verifyPathIsAccessible(filePrefix, "Output path does not exist or is not writeable: %s");
- return returnValue;
- }
-
- @Override
- public String verifyPath(String path) {
- GcsPath gcsPath = getGcsPath(path);
- Preconditions.checkArgument(gcsPath.isAbsolute(),
- "Must provide absolute paths for Dataflow");
- Preconditions.checkArgument(!gcsPath.getObject().contains("//"),
- "Dataflow Service does not allow objects with consecutive slashes");
- return gcsPath.toResourceName();
- }
-
- private void verifyPathIsAccessible(String path, String errorMessage) {
- GcsPath gcsPath = getGcsPath(path);
- try {
- Preconditions.checkArgument(dataflowOptions.getGcsUtil().bucketExists(gcsPath),
- errorMessage, path);
- } catch (IOException e) {
- throw new RuntimeException(
- String.format("Unable to verify that GCS bucket gs://%s exists.", gcsPath.getBucket()),
- e);
- }
- }
-
- private GcsPath getGcsPath(String path) {
- try {
- return GcsPath.fromUri(path);
- } catch (IllegalArgumentException e) {
- throw new IllegalArgumentException(String.format(
- "%s expected a valid 'gs://' path but was given '%s'",
- dataflowOptions.getRunner().getSimpleName(), path), e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowReleaseInfo.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowReleaseInfo.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowReleaseInfo.java
deleted file mode 100644
index 39b3005..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowReleaseInfo.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.api.client.json.GenericJson;
-import com.google.api.client.util.Key;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Properties;
-
-/**
- * Utilities for working with the Dataflow distribution.
- */
-public final class DataflowReleaseInfo extends GenericJson {
- private static final Logger LOG = LoggerFactory.getLogger(DataflowReleaseInfo.class);
-
- private static final String DATAFLOW_PROPERTIES_PATH =
- "/com/google/cloud/dataflow/sdk/sdk.properties";
-
- private static class LazyInit {
- private static final DataflowReleaseInfo INSTANCE =
- new DataflowReleaseInfo(DATAFLOW_PROPERTIES_PATH);
- }
-
- /**
- * Returns an instance of DataflowReleaseInfo.
- */
- public static DataflowReleaseInfo getReleaseInfo() {
- return LazyInit.INSTANCE;
- }
-
- @Key private String name = "Google Cloud Dataflow Java SDK";
- @Key private String version = "Unknown";
-
- /** Provides the SDK name. */
- public String getName() {
- return name;
- }
-
- /** Provides the SDK version. */
- public String getVersion() {
- return version;
- }
-
- private DataflowReleaseInfo(String resourcePath) {
- Properties properties = new Properties();
-
- InputStream in = DataflowReleaseInfo.class.getResourceAsStream(
- DATAFLOW_PROPERTIES_PATH);
- if (in == null) {
- LOG.warn("Dataflow properties resource not found: {}", resourcePath);
- return;
- }
-
- try {
- properties.load(in);
- } catch (IOException e) {
- LOG.warn("Error loading Dataflow properties resource: ", e);
- }
-
- for (String name : properties.stringPropertyNames()) {
- if (name.equals("name")) {
- // We don't allow the properties to override the SDK name.
- continue;
- }
- put(name, properties.getProperty(name));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DirectModeExecutionContext.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DirectModeExecutionContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DirectModeExecutionContext.java
deleted file mode 100644
index 6e97053..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DirectModeExecutionContext.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.ValueWithMetadata;
-import com.google.cloud.dataflow.sdk.util.common.worker.StateSampler;
-import com.google.cloud.dataflow.sdk.util.state.InMemoryStateInternals;
-import com.google.cloud.dataflow.sdk.util.state.StateInternals;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * {@link ExecutionContext} for use in direct mode.
- */
-public class DirectModeExecutionContext
- extends BaseExecutionContext<DirectModeExecutionContext.StepContext> {
-
- private Object key;
- private List<ValueWithMetadata<?>> output = Lists.newArrayList();
- private Map<TupleTag<?>, List<ValueWithMetadata<?>>> sideOutputs = Maps.newHashMap();
-
- protected DirectModeExecutionContext() {}
-
- public static DirectModeExecutionContext create() {
- return new DirectModeExecutionContext();
- }
-
- @Override
- protected StepContext createStepContext(
- String stepName, String transformName, StateSampler stateSampler) {
- return new StepContext(this, stepName, transformName);
- }
-
- public Object getKey() {
- return key;
- }
-
- public void setKey(Object newKey) {
- // The direct mode runner may reorder elements, so we need to keep
- // around the state used for each key.
- for (ExecutionContext.StepContext stepContext : getAllStepContexts()) {
- ((StepContext) stepContext).switchKey(newKey);
- }
- key = newKey;
- }
-
- @Override
- public void noteOutput(WindowedValue<?> outputElem) {
- output.add(ValueWithMetadata.of(outputElem).withKey(getKey()));
- }
-
- @Override
- public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> outputElem) {
- List<ValueWithMetadata<?>> output = sideOutputs.get(tag);
- if (output == null) {
- output = Lists.newArrayList();
- sideOutputs.put(tag, output);
- }
- output.add(ValueWithMetadata.of(outputElem).withKey(getKey()));
- }
-
- public <T> List<ValueWithMetadata<T>> getOutput(@SuppressWarnings("unused") TupleTag<T> tag) {
- @SuppressWarnings({"unchecked", "rawtypes"}) // Cast not expressible without rawtypes
- List<ValueWithMetadata<T>> typedOutput = (List) output;
- return typedOutput;
- }
-
- public <T> List<ValueWithMetadata<T>> getSideOutput(TupleTag<T> tag) {
- if (sideOutputs.containsKey(tag)) {
- @SuppressWarnings({"unchecked", "rawtypes"}) // Cast not expressible without rawtypes
- List<ValueWithMetadata<T>> typedOutput = (List) sideOutputs.get(tag);
- return typedOutput;
- } else {
- return Lists.newArrayList();
- }
- }
-
- /**
- * {@link ExecutionContext.StepContext} used in direct mode.
- */
- public static class StepContext extends BaseExecutionContext.StepContext {
-
- /** A map from each key to the state associated with it. */
- private final Map<Object, InMemoryStateInternals<Object>> stateInternals = Maps.newHashMap();
- private InMemoryStateInternals<Object> currentStateInternals = null;
-
- private StepContext(ExecutionContext executionContext, String stepName, String transformName) {
- super(executionContext, stepName, transformName);
- switchKey(null);
- }
-
- public void switchKey(Object newKey) {
- currentStateInternals = stateInternals.get(newKey);
- if (currentStateInternals == null) {
- currentStateInternals = InMemoryStateInternals.forKey(newKey);
- stateInternals.put(newKey, currentStateInternals);
- }
- }
-
- @Override
- public StateInternals<Object> stateInternals() {
- return checkNotNull(currentStateInternals);
- }
-
- @Override
- public TimerInternals timerInternals() {
- throw new UnsupportedOperationException("Direct mode cannot return timerInternals");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DirectSideInputReader.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DirectSideInputReader.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DirectSideInputReader.java
deleted file mode 100644
index ee8c922..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DirectSideInputReader.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
-
-/**
- * Basic side input reader wrapping a {@link PTuple} of side input iterables. Encapsulates
- * conversion according to the {@link PCollectionView} and projection to a particular
- * window.
- */
-public class DirectSideInputReader implements SideInputReader {
-
- private PTuple sideInputValues;
-
- private DirectSideInputReader(PTuple sideInputValues) {
- this.sideInputValues = sideInputValues;
- }
-
- public static DirectSideInputReader of(PTuple sideInputValues) {
- return new DirectSideInputReader(sideInputValues);
- }
-
- @Override
- public <T> boolean contains(PCollectionView<T> view) {
- return sideInputValues.has(view.getTagInternal());
- }
-
- @Override
- public boolean isEmpty() {
- return sideInputValues.isEmpty();
- }
-
- @Override
- public <T> T get(PCollectionView<T> view, final BoundedWindow window) {
- final TupleTag<Iterable<WindowedValue<?>>> tag = view.getTagInternal();
- if (!sideInputValues.has(tag)) {
- throw new IllegalArgumentException("calling getSideInput() with unknown view");
- }
-
- if (view.getWindowingStrategyInternal().getWindowFn() instanceof GlobalWindows) {
- return view.fromIterableInternal(sideInputValues.get(tag));
- } else {
- return view.fromIterableInternal(
- Iterables.filter(sideInputValues.get(tag),
- new Predicate<WindowedValue<?>>() {
- @Override
- public boolean apply(WindowedValue<?> element) {
- return element.getWindows().contains(window);
- }
- }));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnInfo.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnInfo.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnInfo.java
deleted file mode 100644
index 15a3a47..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnInfo.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-
-import java.io.Serializable;
-
-/**
- * Wrapper class holding the necessary information to serialize a DoFn.
- *
- * @param <InputT> the type of the (main) input elements of the DoFn
- * @param <OutputT> the type of the (main) output elements of the DoFn
- */
-public class DoFnInfo<InputT, OutputT> implements Serializable {
- private final DoFn<InputT, OutputT> doFn;
- private final WindowingStrategy<?, ?> windowingStrategy;
- private final Iterable<PCollectionView<?>> sideInputViews;
- private final Coder<InputT> inputCoder;
-
- public DoFnInfo(DoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy) {
- this.doFn = doFn;
- this.windowingStrategy = windowingStrategy;
- this.sideInputViews = null;
- this.inputCoder = null;
- }
-
- public DoFnInfo(DoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy,
- Iterable<PCollectionView<?>> sideInputViews, Coder<InputT> inputCoder) {
- this.doFn = doFn;
- this.windowingStrategy = windowingStrategy;
- this.sideInputViews = sideInputViews;
- this.inputCoder = inputCoder;
- }
-
- public DoFn<InputT, OutputT> getDoFn() {
- return doFn;
- }
-
- public WindowingStrategy<?, ?> getWindowingStrategy() {
- return windowingStrategy;
- }
-
- public Iterable<PCollectionView<?>> getSideInputViews() {
- return sideInputViews;
- }
-
- public Coder<InputT> getInputCoder() {
- return inputCoder;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnRunner.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnRunner.java
deleted file mode 100644
index 51c3f39..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnRunner.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.DoFn.ProcessContext;
-import com.google.cloud.dataflow.sdk.values.KV;
-
-/**
- * An wrapper interface that represents the execution of a {@link DoFn}.
- */
-public interface DoFnRunner<InputT, OutputT> {
- /**
- * Prepares and calls {@link DoFn#startBundle}.
- */
- public void startBundle();
-
- /**
- * Calls {@link DoFn#processElement} with a {@link ProcessContext} containing the current element.
- */
- public void processElement(WindowedValue<InputT> elem);
-
- /**
- * Calls {@link DoFn#finishBundle} and performs additional tasks, such as
- * flushing in-memory states.
- */
- public void finishBundle();
-
- /**
- * An internal interface for signaling that a {@link DoFn} requires late data dropping.
- */
- public interface ReduceFnExecutor<K, InputT, OutputT, W> {
- /**
- * Gets this object as a {@link DoFn}.
- *
- * Most implementors of this interface are expected to be {@link DoFn} instances, and will
- * return themselves.
- */
- DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn();
-
- /**
- * Returns an aggregator that tracks elements that are dropped due to being late.
- */
- Aggregator<Long, Long> getDroppedDueToLatenessAggregator();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnRunnerBase.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnRunnerBase.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnRunnerBase.java
deleted file mode 100644
index 04ec59f..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnRunnerBase.java
+++ /dev/null
@@ -1,558 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.IterableCoder;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
-import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
-import com.google.cloud.dataflow.sdk.util.DoFnRunners.OutputManager;
-import com.google.cloud.dataflow.sdk.util.ExecutionContext.StepContext;
-import com.google.cloud.dataflow.sdk.util.common.CounterSet;
-import com.google.cloud.dataflow.sdk.util.state.StateInternals;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import org.joda.time.Instant;
-import org.joda.time.format.PeriodFormat;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * A base implementation of {@link DoFnRunner}.
- *
- * <p> Sub-classes should override {@link #invokeProcessElement}.
- */
-public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
-
- /** The DoFn being run. */
- public final DoFn<InputT, OutputT> fn;
-
- /** The context used for running the DoFn. */
- public final DoFnContext<InputT, OutputT> context;
-
- protected DoFnRunnerBase(
- PipelineOptions options,
- DoFn<InputT, OutputT> fn,
- SideInputReader sideInputReader,
- OutputManager outputManager,
- TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> sideOutputTags,
- StepContext stepContext,
- CounterSet.AddCounterMutator addCounterMutator,
- WindowingStrategy<?, ?> windowingStrategy) {
- this.fn = fn;
- this.context = new DoFnContext<>(
- options,
- fn,
- sideInputReader,
- outputManager,
- mainOutputTag,
- sideOutputTags,
- stepContext,
- addCounterMutator,
- windowingStrategy == null ? null : windowingStrategy.getWindowFn());
- }
-
- /**
- * An implementation of {@code OutputManager} using simple lists, for testing and in-memory
- * contexts such as the {@link DirectPipelineRunner}.
- */
- public static class ListOutputManager implements OutputManager {
-
- private Map<TupleTag<?>, List<WindowedValue<?>>> outputLists = Maps.newHashMap();
-
- @Override
- public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
- @SuppressWarnings({"rawtypes", "unchecked"})
- List<WindowedValue<T>> outputList = (List) outputLists.get(tag);
-
- if (outputList == null) {
- outputList = Lists.newArrayList();
- @SuppressWarnings({"rawtypes", "unchecked"})
- List<WindowedValue<?>> untypedList = (List) outputList;
- outputLists.put(tag, untypedList);
- }
-
- outputList.add(output);
- }
-
- public <T> List<WindowedValue<T>> getOutput(TupleTag<T> tag) {
- // Safe cast by design, inexpressible in Java without rawtypes
- @SuppressWarnings({"rawtypes", "unchecked"})
- List<WindowedValue<T>> outputList = (List) outputLists.get(tag);
- return (outputList != null) ? outputList : Collections.<WindowedValue<T>>emptyList();
- }
- }
-
- @Override
- public void startBundle() {
- // This can contain user code. Wrap it in case it throws an exception.
- try {
- fn.startBundle(context);
- } catch (Throwable t) {
- // Exception in user code.
- throw wrapUserCodeException(t);
- }
- }
-
- @Override
- public void processElement(WindowedValue<InputT> elem) {
- if (elem.getWindows().size() <= 1
- || (!RequiresWindowAccess.class.isAssignableFrom(fn.getClass())
- && context.sideInputReader.isEmpty())) {
- invokeProcessElement(elem);
- } else {
- // We could modify the windowed value (and the processContext) to
- // avoid repeated allocations, but this is more straightforward.
- for (BoundedWindow window : elem.getWindows()) {
- invokeProcessElement(WindowedValue.of(
- elem.getValue(), elem.getTimestamp(), window, elem.getPane()));
- }
- }
- }
-
- /**
- * Invokes {@link DoFn#processElement} after certain pre-processings has been done in
- * {@link DoFnRunnerBase#processElement}.
- */
- protected abstract void invokeProcessElement(WindowedValue<InputT> elem);
-
- @Override
- public void finishBundle() {
- // This can contain user code. Wrap it in case it throws an exception.
- try {
- fn.finishBundle(context);
- } catch (Throwable t) {
- // Exception in user code.
- throw wrapUserCodeException(t);
- }
- }
-
- /**
- * A concrete implementation of {@code DoFn.Context} used for running a {@link DoFn}.
- *
- * @param <InputT> the type of the DoFn's (main) input elements
- * @param <OutputT> the type of the DoFn's (main) output elements
- */
- private static class DoFnContext<InputT, OutputT>
- extends DoFn<InputT, OutputT>.Context {
- private static final int MAX_SIDE_OUTPUTS = 1000;
-
- final PipelineOptions options;
- final DoFn<InputT, OutputT> fn;
- final SideInputReader sideInputReader;
- final OutputManager outputManager;
- final TupleTag<OutputT> mainOutputTag;
- final StepContext stepContext;
- final CounterSet.AddCounterMutator addCounterMutator;
- final WindowFn<?, ?> windowFn;
-
- /**
- * The set of known output tags, some of which may be undeclared, so we can throw an
- * exception when it exceeds {@link #MAX_SIDE_OUTPUTS}.
- */
- private Set<TupleTag<?>> outputTags;
-
- public DoFnContext(PipelineOptions options,
- DoFn<InputT, OutputT> fn,
- SideInputReader sideInputReader,
- OutputManager outputManager,
- TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> sideOutputTags,
- StepContext stepContext,
- CounterSet.AddCounterMutator addCounterMutator,
- WindowFn<?, ?> windowFn) {
- fn.super();
- this.options = options;
- this.fn = fn;
- this.sideInputReader = sideInputReader;
- this.outputManager = outputManager;
- this.mainOutputTag = mainOutputTag;
- this.outputTags = Sets.newHashSet();
-
- outputTags.add(mainOutputTag);
- for (TupleTag<?> sideOutputTag : sideOutputTags) {
- outputTags.add(sideOutputTag);
- }
-
- this.stepContext = stepContext;
- this.addCounterMutator = addCounterMutator;
- this.windowFn = windowFn;
- super.setupDelegateAggregators();
- }
-
- //////////////////////////////////////////////////////////////////////////////
-
- @Override
- public PipelineOptions getPipelineOptions() {
- return options;
- }
-
- <T, W extends BoundedWindow> WindowedValue<T> makeWindowedValue(
- T output, Instant timestamp, Collection<W> windows, PaneInfo pane) {
- final Instant inputTimestamp = timestamp;
-
- if (timestamp == null) {
- timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
- }
-
- if (windows == null) {
- try {
- // The windowFn can never succeed at accessing the element, so its type does not
- // matter here
- @SuppressWarnings("unchecked")
- WindowFn<Object, W> objectWindowFn = (WindowFn<Object, W>) windowFn;
- windows = objectWindowFn.assignWindows(objectWindowFn.new AssignContext() {
- @Override
- public Object element() {
- throw new UnsupportedOperationException(
- "WindowFn attempted to access input element when none was available");
- }
-
- @Override
- public Instant timestamp() {
- if (inputTimestamp == null) {
- throw new UnsupportedOperationException(
- "WindowFn attempted to access input timestamp when none was available");
- }
- return inputTimestamp;
- }
-
- @Override
- public Collection<? extends BoundedWindow> windows() {
- throw new UnsupportedOperationException(
- "WindowFn attempted to access input windows when none were available");
- }
- });
- } catch (Exception e) {
- throw UserCodeException.wrap(e);
- }
- }
-
- return WindowedValue.of(output, timestamp, windows, pane);
- }
-
- public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
- if (!sideInputReader.contains(view)) {
- throw new IllegalArgumentException("calling sideInput() with unknown view");
- }
- BoundedWindow sideInputWindow =
- view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow);
- return sideInputReader.get(view, sideInputWindow);
- }
-
- void outputWindowedValue(
- OutputT output,
- Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo pane) {
- outputWindowedValue(makeWindowedValue(output, timestamp, windows, pane));
- }
-
- void outputWindowedValue(WindowedValue<OutputT> windowedElem) {
- outputManager.output(mainOutputTag, windowedElem);
- if (stepContext != null) {
- stepContext.noteOutput(windowedElem);
- }
- }
-
- protected <T> void sideOutputWindowedValue(TupleTag<T> tag,
- T output,
- Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo pane) {
- sideOutputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, pane));
- }
-
- protected <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) {
- if (!outputTags.contains(tag)) {
- // This tag wasn't declared nor was it seen before during this execution.
- // Thus, this must be a new, undeclared and unconsumed output.
- // To prevent likely user errors, enforce the limit on the number of side
- // outputs.
- if (outputTags.size() >= MAX_SIDE_OUTPUTS) {
- throw new IllegalArgumentException(
- "the number of side outputs has exceeded a limit of " + MAX_SIDE_OUTPUTS);
- }
- outputTags.add(tag);
- }
-
- outputManager.output(tag, windowedElem);
- if (stepContext != null) {
- stepContext.noteSideOutput(tag, windowedElem);
- }
- }
-
- // Following implementations of output, outputWithTimestamp, and sideOutput
- // are only accessible in DoFn.startBundle and DoFn.finishBundle, and will be shadowed by
- // ProcessContext's versions in DoFn.processElement.
- @Override
- public void output(OutputT output) {
- outputWindowedValue(output, null, null, PaneInfo.NO_FIRING);
- }
-
- @Override
- public void outputWithTimestamp(OutputT output, Instant timestamp) {
- outputWindowedValue(output, timestamp, null, PaneInfo.NO_FIRING);
- }
-
- @Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
- Preconditions.checkNotNull(tag, "TupleTag passed to sideOutput cannot be null");
- sideOutputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING);
- }
-
- @Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- Preconditions.checkNotNull(tag, "TupleTag passed to sideOutputWithTimestamp cannot be null");
- sideOutputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING);
- }
-
- private String generateInternalAggregatorName(String userName) {
- boolean system = fn.getClass().isAnnotationPresent(SystemDoFnInternal.class);
- return (system ? "" : "user-") + stepContext.getStepName() + "-" + userName;
- }
-
- @Override
- protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
- String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
- Preconditions.checkNotNull(combiner,
- "Combiner passed to createAggregator cannot be null");
- return new CounterAggregator<>(generateInternalAggregatorName(name),
- combiner, addCounterMutator);
- }
- }
-
- /**
- * Returns a new {@code DoFn.ProcessContext} for the given element.
- */
- protected DoFn<InputT, OutputT>.ProcessContext createProcessContext(WindowedValue<InputT> elem) {
- return new DoFnProcessContext<InputT, OutputT>(fn, context, elem);
- }
-
- protected RuntimeException wrapUserCodeException(Throwable t) {
- throw UserCodeException.wrapIf(!isSystemDoFn(), t);
- }
-
- private boolean isSystemDoFn() {
- return fn.getClass().isAnnotationPresent(SystemDoFnInternal.class);
- }
-
- /**
- * A concrete implementation of {@code DoFn.ProcessContext} used for
- * running a {@link DoFn} over a single element.
- *
- * @param <InputT> the type of the DoFn's (main) input elements
- * @param <OutputT> the type of the DoFn's (main) output elements
- */
- static class DoFnProcessContext<InputT, OutputT>
- extends DoFn<InputT, OutputT>.ProcessContext {
-
-
- final DoFn<InputT, OutputT> fn;
- final DoFnContext<InputT, OutputT> context;
- final WindowedValue<InputT> windowedValue;
-
- public DoFnProcessContext(DoFn<InputT, OutputT> fn,
- DoFnContext<InputT, OutputT> context,
- WindowedValue<InputT> windowedValue) {
- fn.super();
- this.fn = fn;
- this.context = context;
- this.windowedValue = windowedValue;
- }
-
- @Override
- public PipelineOptions getPipelineOptions() {
- return context.getPipelineOptions();
- }
-
- @Override
- public InputT element() {
- return windowedValue.getValue();
- }
-
- @Override
- public <T> T sideInput(PCollectionView<T> view) {
- Preconditions.checkNotNull(view, "View passed to sideInput cannot be null");
- Iterator<? extends BoundedWindow> windowIter = windows().iterator();
- BoundedWindow window;
- if (!windowIter.hasNext()) {
- if (context.windowFn instanceof GlobalWindows) {
- // TODO: Remove this once GroupByKeyOnly no longer outputs elements
- // without windows
- window = GlobalWindow.INSTANCE;
- } else {
- throw new IllegalStateException(
- "sideInput called when main input element is not in any windows");
- }
- } else {
- window = windowIter.next();
- if (windowIter.hasNext()) {
- throw new IllegalStateException(
- "sideInput called when main input element is in multiple windows");
- }
- }
- return context.sideInput(view, window);
- }
-
- @Override
- public BoundedWindow window() {
- if (!(fn instanceof RequiresWindowAccess)) {
- throw new UnsupportedOperationException(
- "window() is only available in the context of a DoFn marked as RequiresWindow.");
- }
- return Iterables.getOnlyElement(windows());
- }
-
- @Override
- public PaneInfo pane() {
- return windowedValue.getPane();
- }
-
- @Override
- public void output(OutputT output) {
- context.outputWindowedValue(windowedValue.withValue(output));
- }
-
- @Override
- public void outputWithTimestamp(OutputT output, Instant timestamp) {
- checkTimestamp(timestamp);
- context.outputWindowedValue(output, timestamp,
- windowedValue.getWindows(), windowedValue.getPane());
- }
-
- void outputWindowedValue(
- OutputT output,
- Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo pane) {
- context.outputWindowedValue(output, timestamp, windows, pane);
- }
-
- @Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
- Preconditions.checkNotNull(tag, "Tag passed to sideOutput cannot be null");
- context.sideOutputWindowedValue(tag, windowedValue.withValue(output));
- }
-
- @Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- Preconditions.checkNotNull(tag, "Tag passed to sideOutputWithTimestamp cannot be null");
- checkTimestamp(timestamp);
- context.sideOutputWindowedValue(
- tag, output, timestamp, windowedValue.getWindows(), windowedValue.getPane());
- }
-
- @Override
- public Instant timestamp() {
- return windowedValue.getTimestamp();
- }
-
- public Collection<? extends BoundedWindow> windows() {
- return windowedValue.getWindows();
- }
-
- private void checkTimestamp(Instant timestamp) {
- if (timestamp.isBefore(windowedValue.getTimestamp().minus(fn.getAllowedTimestampSkew()))) {
- throw new IllegalArgumentException(String.format(
- "Cannot output with timestamp %s. Output timestamps must be no earlier than the "
- + "timestamp of the current input (%s) minus the allowed skew (%s). See the "
- + "DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.",
- timestamp, windowedValue.getTimestamp(),
- PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod())));
- }
- }
-
- @Override
- public WindowingInternals<InputT, OutputT> windowingInternals() {
- return new WindowingInternals<InputT, OutputT>() {
- @Override
- public void outputWindowedValue(OutputT output, Instant timestamp,
- Collection<? extends BoundedWindow> windows, PaneInfo pane) {
- context.outputWindowedValue(output, timestamp, windows, pane);
- }
-
- @Override
- public Collection<? extends BoundedWindow> windows() {
- return windowedValue.getWindows();
- }
-
- @Override
- public PaneInfo pane() {
- return windowedValue.getPane();
- }
-
- @Override
- public TimerInternals timerInternals() {
- return context.stepContext.timerInternals();
- }
-
- @Override
- public <T> void writePCollectionViewData(
- TupleTag<?> tag,
- Iterable<WindowedValue<T>> data,
- Coder<T> elemCoder) throws IOException {
- @SuppressWarnings("unchecked")
- Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>) context.windowFn.windowCoder();
-
- context.stepContext.writePCollectionViewData(
- tag, data, IterableCoder.of(WindowedValue.getFullCoder(elemCoder, windowCoder)),
- window(), windowCoder);
- }
-
- @Override
- public StateInternals<?> stateInternals() {
- return context.stepContext.stateInternals();
- }
-
- @Override
- public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
- return context.sideInput(view, mainInputWindow);
- }
- };
- }
-
- @Override
- protected <AggregatorInputT, AggregatorOutputT> Aggregator<AggregatorInputT, AggregatorOutputT>
- createAggregatorInternal(
- String name, CombineFn<AggregatorInputT, ?, AggregatorOutputT> combiner) {
- return context.createAggregatorInternal(name, combiner);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnRunners.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnRunners.java
deleted file mode 100644
index d56b36e..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnRunners.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.util.DoFnRunner.ReduceFnExecutor;
-import com.google.cloud.dataflow.sdk.util.ExecutionContext.StepContext;
-import com.google.cloud.dataflow.sdk.util.common.CounterSet;
-import com.google.cloud.dataflow.sdk.util.common.CounterSet.AddCounterMutator;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-
-import java.util.List;
-
-/**
- * Static utility methods that provide {@link DoFnRunner} implementations.
- */
-public class DoFnRunners {
- /**
- * Information about how to create output receivers and output to them.
- */
- public interface OutputManager {
- /**
- * Outputs a single element to the receiver indicated by the given {@link TupleTag}.
- */
- public <T> void output(TupleTag<T> tag, WindowedValue<T> output);
- }
-
- /**
- * Returns a basic implementation of {@link DoFnRunner} that works for most {@link DoFn DoFns}.
- *
- * <p>It invokes {@link DoFn#processElement} for each input.
- */
- public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner(
- PipelineOptions options,
- DoFn<InputT, OutputT> fn,
- SideInputReader sideInputReader,
- OutputManager outputManager,
- TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> sideOutputTags,
- StepContext stepContext,
- CounterSet.AddCounterMutator addCounterMutator,
- WindowingStrategy<?, ?> windowingStrategy) {
- return new SimpleDoFnRunner<>(
- options,
- fn,
- sideInputReader,
- outputManager,
- mainOutputTag,
- sideOutputTags,
- stepContext,
- addCounterMutator,
- windowingStrategy);
- }
-
- /**
- * Returns an implementation of {@link DoFnRunner} that handles late data dropping.
- *
- * <p>It drops elements from expired windows before they reach the underlying {@link DoFn}.
- */
- public static <K, InputT, OutputT, W extends BoundedWindow>
- DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> lateDataDroppingRunner(
- PipelineOptions options,
- ReduceFnExecutor<K, InputT, OutputT, W> reduceFnExecutor,
- SideInputReader sideInputReader,
- OutputManager outputManager,
- TupleTag<KV<K, OutputT>> mainOutputTag,
- List<TupleTag<?>> sideOutputTags,
- StepContext stepContext,
- CounterSet.AddCounterMutator addCounterMutator,
- WindowingStrategy<?, W> windowingStrategy) {
- DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> simpleDoFnRunner =
- simpleRunner(
- options,
- reduceFnExecutor.asDoFn(),
- sideInputReader,
- outputManager,
- mainOutputTag,
- sideOutputTags,
- stepContext,
- addCounterMutator,
- windowingStrategy);
- return new LateDataDroppingDoFnRunner<>(
- simpleDoFnRunner,
- windowingStrategy,
- stepContext.timerInternals(),
- reduceFnExecutor.getDroppedDueToLatenessAggregator());
- }
-
- public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault(
- PipelineOptions options,
- DoFn<InputT, OutputT> doFn,
- SideInputReader sideInputReader,
- OutputManager outputManager,
- TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> sideOutputTags,
- StepContext stepContext,
- AddCounterMutator addCounterMutator,
- WindowingStrategy<?, ?> windowingStrategy) {
- if (doFn instanceof ReduceFnExecutor) {
- @SuppressWarnings("rawtypes")
- ReduceFnExecutor fn = (ReduceFnExecutor) doFn;
- @SuppressWarnings({"unchecked", "cast", "rawtypes"})
- DoFnRunner<InputT, OutputT> runner = (DoFnRunner<InputT, OutputT>) lateDataDroppingRunner(
- options,
- fn,
- sideInputReader,
- outputManager,
- (TupleTag) mainOutputTag,
- sideOutputTags,
- stepContext,
- addCounterMutator,
- (WindowingStrategy) windowingStrategy);
- return runner;
- }
- return simpleRunner(
- options,
- doFn,
- sideInputReader,
- outputManager,
- mainOutputTag,
- sideOutputTags,
- stepContext,
- addCounterMutator,
- windowingStrategy);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ExecutableTrigger.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ExecutableTrigger.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ExecutableTrigger.java
deleted file mode 100644
index 22a3762..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ExecutableTrigger.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger.OnceTrigger;
-import com.google.common.base.Preconditions;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * A wrapper around a trigger used during execution. While an actual trigger may appear multiple
- * times (both in the same trigger expression and in other trigger expressions), the
- * {@code ExecutableTrigger} wrapped around them forms a tree (only one occurrence).
- *
- * @param <W> {@link BoundedWindow} subclass used to represent the windows used.
- */
-public class ExecutableTrigger<W extends BoundedWindow> implements Serializable {
-
- /** Store the index assigned to this trigger. */
- private final int triggerIndex;
- private final int firstIndexAfterSubtree;
- private final List<ExecutableTrigger<W>> subTriggers = new ArrayList<>();
- private final Trigger<W> trigger;
-
- public static <W extends BoundedWindow> ExecutableTrigger<W> create(Trigger<W> trigger) {
- return create(trigger, 0);
- }
-
- private static <W extends BoundedWindow> ExecutableTrigger<W> create(
- Trigger<W> trigger, int nextUnusedIndex) {
- if (trigger instanceof OnceTrigger) {
- return new ExecutableOnceTrigger<W>((OnceTrigger<W>) trigger, nextUnusedIndex);
- } else {
- return new ExecutableTrigger<W>(trigger, nextUnusedIndex);
- }
- }
-
- public static <W extends BoundedWindow> ExecutableTrigger<W> createForOnceTrigger(
- OnceTrigger<W> trigger, int nextUnusedIndex) {
- return new ExecutableOnceTrigger<W>(trigger, nextUnusedIndex);
- }
-
- private ExecutableTrigger(Trigger<W> trigger, int nextUnusedIndex) {
- this.trigger = Preconditions.checkNotNull(trigger, "trigger must not be null");
- this.triggerIndex = nextUnusedIndex++;
-
- if (trigger.subTriggers() != null) {
- for (Trigger<W> subTrigger : trigger.subTriggers()) {
- ExecutableTrigger<W> subExecutable = create(subTrigger, nextUnusedIndex);
- subTriggers.add(subExecutable);
- nextUnusedIndex = subExecutable.firstIndexAfterSubtree;
- }
- }
- firstIndexAfterSubtree = nextUnusedIndex;
- }
-
- public List<ExecutableTrigger<W>> subTriggers() {
- return subTriggers;
- }
-
- @Override
- public String toString() {
- return trigger.toString();
- }
-
- /**
- * Return the underlying trigger specification corresponding to this {@code ExecutableTrigger}.
- */
- public Trigger<W> getSpec() {
- return trigger;
- }
-
- public int getTriggerIndex() {
- return triggerIndex;
- }
-
- public final int getFirstIndexAfterSubtree() {
- return firstIndexAfterSubtree;
- }
-
- public boolean isCompatible(ExecutableTrigger<W> other) {
- return trigger.isCompatible(other.trigger);
- }
-
- public ExecutableTrigger<W> getSubTriggerContaining(int index) {
- Preconditions.checkNotNull(subTriggers);
- Preconditions.checkState(index > triggerIndex && index < firstIndexAfterSubtree,
- "Cannot find sub-trigger containing index not in this tree.");
- ExecutableTrigger<W> previous = null;
- for (ExecutableTrigger<W> subTrigger : subTriggers) {
- if (index < subTrigger.triggerIndex) {
- return previous;
- }
- previous = subTrigger;
- }
- return previous;
- }
-
- /**
- * Invoke the {@link Trigger#onElement} method for this trigger, ensuring that the bits are
- * properly updated if the trigger finishes.
- */
- public void invokeOnElement(Trigger<W>.OnElementContext c) throws Exception {
- trigger.onElement(c.forTrigger(this));
- }
-
- /**
- * Invoke the {@link Trigger#onMerge} method for this trigger, ensuring that the bits are properly
- * updated.
- */
- public void invokeOnMerge(Trigger<W>.OnMergeContext c) throws Exception {
- Trigger<W>.OnMergeContext subContext = c.forTrigger(this);
- trigger.onMerge(subContext);
- }
-
- public boolean invokeShouldFire(Trigger<W>.TriggerContext c) throws Exception {
- return trigger.shouldFire(c.forTrigger(this));
- }
-
- public void invokeOnFire(Trigger<W>.TriggerContext c) throws Exception {
- trigger.onFire(c.forTrigger(this));
- }
-
- /**
- * Invoke clear for the current this trigger.
- */
- public void invokeClear(Trigger<W>.TriggerContext c) throws Exception {
- trigger.clear(c.forTrigger(this));
- }
-
- /**
- * {@link ExecutableTrigger} that enforces the fact that the trigger should always FIRE_AND_FINISH
- * and never just FIRE.
- */
- private static class ExecutableOnceTrigger<W extends BoundedWindow> extends ExecutableTrigger<W> {
-
- public ExecutableOnceTrigger(OnceTrigger<W> trigger, int nextUnusedIndex) {
- super(trigger, nextUnusedIndex);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ExecutionContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ExecutionContext.java
deleted file mode 100644
index cff5b95..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ExecutionContext.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.util.common.worker.StateSampler;
-import com.google.cloud.dataflow.sdk.util.state.StateInternals;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-
-import java.io.IOException;
-import java.util.Collection;
-
-/**
- * Context for the current execution. This is guaranteed to exist during processing,
- * but does not necessarily persist between different batches of work.
- */
-public interface ExecutionContext {
- /**
- * Returns the {@link StepContext} associated with the given step.
- */
- StepContext getOrCreateStepContext(
- String stepName, String transformName, StateSampler stateSampler);
-
- /**
- * Returns a collection view of all of the {@link StepContext}s.
- */
- Collection<? extends StepContext> getAllStepContexts();
-
- /**
- * Hook for subclasses to implement that will be called whenever
- * {@link com.google.cloud.dataflow.sdk.transforms.DoFn.Context#output}
- * is called.
- */
- void noteOutput(WindowedValue<?> output);
-
- /**
- * Hook for subclasses to implement that will be called whenever
- * {@link com.google.cloud.dataflow.sdk.transforms.DoFn.Context#sideOutput}
- * is called.
- */
- void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output);
-
- /**
- * Per-step, per-key context used for retrieving state.
- */
- public interface StepContext {
-
- /**
- * The name of the step.
- */
- String getStepName();
-
- /**
- * The name of the transform for the step.
- */
- String getTransformName();
-
- /**
- * Hook for subclasses to implement that will be called whenever
- * {@link com.google.cloud.dataflow.sdk.transforms.DoFn.Context#output}
- * is called.
- */
- void noteOutput(WindowedValue<?> output);
-
- /**
- * Hook for subclasses to implement that will be called whenever
- * {@link com.google.cloud.dataflow.sdk.transforms.DoFn.Context#sideOutput}
- * is called.
- */
- void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output);
-
- /**
- * Writes the given {@code PCollectionView} data to a globally accessible location.
- */
- <T, W extends BoundedWindow> void writePCollectionViewData(
- TupleTag<?> tag,
- Iterable<WindowedValue<T>> data,
- Coder<Iterable<WindowedValue<T>>> dataCoder,
- W window,
- Coder<W> windowCoder)
- throws IOException;
-
- StateInternals<?> stateInternals();
-
- TimerInternals timerInternals();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ExposedByteArrayInputStream.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ExposedByteArrayInputStream.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ExposedByteArrayInputStream.java
deleted file mode 100644
index dff5fd1..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ExposedByteArrayInputStream.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-
-/**
- * {@link ByteArrayInputStream} that allows accessing the entire internal buffer without copying.
- */
-public class ExposedByteArrayInputStream extends ByteArrayInputStream{
-
- public ExposedByteArrayInputStream(byte[] buf) {
- super(buf);
- }
-
- /** Read all remaining bytes.
- * @throws IOException */
- public byte[] readAll() throws IOException {
- if (pos == 0 && count == buf.length) {
- pos = count;
- return buf;
- }
- byte[] ret = new byte[count - pos];
- super.read(ret);
- return ret;
- }
-
- @Override
- public void close() {
- try {
- super.close();
- } catch (IOException exn) {
- throw new RuntimeException("Unexpected IOException closing ByteArrayInputStream", exn);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ExposedByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ExposedByteArrayOutputStream.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ExposedByteArrayOutputStream.java
deleted file mode 100644
index d8e4d50..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ExposedByteArrayOutputStream.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-
-/**
- * {@link ByteArrayOutputStream} special cased to treat writes of a single byte-array specially.
- * When calling {@link #toByteArray()} after writing only one {@code byte[]} using
- * {@link #writeAndOwn(byte[])}, it will return that array directly.
- */
-public class ExposedByteArrayOutputStream extends ByteArrayOutputStream {
-
- private byte[] swappedBuffer;
-
- /**
- * If true, this stream doesn't allow direct access to the passed in byte-array. It behaves just
- * like a normal {@link ByteArrayOutputStream}.
- *
- * <p>It is set to true after any write operations other than the first call to
- * {@link #writeAndOwn(byte[])}.
- */
- private boolean isFallback = false;
-
- /**
- * Fall back to the behavior of a normal {@link ByteArrayOutputStream}.
- */
- private void fallback() {
- isFallback = true;
- if (swappedBuffer != null) {
- // swappedBuffer != null means buf is actually provided by the caller of writeAndOwn(),
- // while swappedBuffer is the original buffer.
- // Recover the buffer and copy the bytes from buf.
- byte[] tempBuffer = buf;
- count = 0;
- buf = swappedBuffer;
- super.write(tempBuffer, 0, tempBuffer.length);
- swappedBuffer = null;
- }
- }
-
- /**
- * Write {@code b} to the stream and take the ownership of {@code b}.
- * If the stream is empty, {@code b} itself will be used as the content of the stream and
- * no content copy will be involved.
- * <p><i>Note: After passing any byte array to this method, it must not be modified again.</i>
- *
- * @throws IOException
- */
- public void writeAndOwn(byte[] b) throws IOException {
- if (b.length == 0) {
- return;
- }
- if (count == 0) {
- // Optimized first-time whole write.
- // The original buffer will be swapped to swappedBuffer, while the input b is used as buf.
- swappedBuffer = buf;
- buf = b;
- count = b.length;
- } else {
- fallback();
- super.write(b);
- }
- }
-
- @Override
- public void write(byte[] b, int off, int len) {
- fallback();
- super.write(b, off, len);
- }
-
- @Override
- public void write(int b) {
- fallback();
- super.write(b);
- }
-
- @Override
- public byte[] toByteArray() {
- // Note: count == buf.length is not a correct criteria to "return buf;", because the internal
- // buf may be reused after reset().
- if (!isFallback && count > 0) {
- return buf;
- } else {
- return super.toByteArray();
- }
- }
-
- @Override
- public void reset() {
- if (count == 0) {
- return;
- }
- count = 0;
- if (isFallback) {
- isFallback = false;
- } else {
- buf = swappedBuffer;
- swappedBuffer = null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/FileIOChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/FileIOChannelFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/FileIOChannelFactory.java
deleted file mode 100644
index 77d0b83..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/FileIOChannelFactory.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.collect.Iterables;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
-import java.nio.file.FileSystems;
-import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
-import java.nio.file.PathMatcher;
-import java.nio.file.Paths;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.regex.Matcher;
-
-/**
- * Implements IOChannelFactory for local files.
- */
-public class FileIOChannelFactory implements IOChannelFactory {
- private static final Logger LOG = LoggerFactory.getLogger(FileIOChannelFactory.class);
-
- // This implementation only allows for wildcards in the file name.
- // The directory portion must exist as-is.
- @Override
- public Collection<String> match(String spec) throws IOException {
- File file = new File(spec);
-
- File parent = file.getAbsoluteFile().getParentFile();
- if (!parent.exists()) {
- throw new IOException("Unable to find parent directory of " + spec);
- }
-
- // Method getAbsolutePath() on Windows platform may return something like
- // "c:\temp\file.txt". FileSystem.getPathMatcher() call below will treat
- // '\' (backslash) as an escape character, instead of a directory
- // separator. Replacing backslash with double-backslash solves the problem.
- // We perform the replacement on all platforms, even those that allow
- // backslash as a part of the filename, because Globs.toRegexPattern will
- // eat one backslash.
- String pathToMatch = file.getAbsolutePath().replaceAll(Matcher.quoteReplacement("\\"),
- Matcher.quoteReplacement("\\\\"));
-
- final PathMatcher matcher = FileSystems.getDefault().getPathMatcher("glob:" + pathToMatch);
-
- Iterable<File> files = com.google.common.io.Files.fileTreeTraverser().preOrderTraversal(parent);
- Iterable<File> matchedFiles = Iterables.filter(files,
- Predicates.and(
- com.google.common.io.Files.isFile(),
- new Predicate<File>() {
- @Override
- public boolean apply(File input) {
- return matcher.matches(input.toPath());
- }
- }));
-
- List<String> result = new LinkedList<>();
- for (File match : matchedFiles) {
- result.add(match.getPath());
- }
-
- return result;
- }
-
- @Override
- public ReadableByteChannel open(String spec) throws IOException {
- LOG.debug("opening file {}", spec);
- @SuppressWarnings("resource") // The caller is responsible for closing the channel.
- FileInputStream inputStream = new FileInputStream(spec);
- // Use this method for creating the channel (rather than new FileChannel) so that we get
- // regular FileNotFoundException. Closing the underyling channel will close the inputStream.
- return inputStream.getChannel();
- }
-
- @Override
- public WritableByteChannel create(String spec, String mimeType)
- throws IOException {
- LOG.debug("creating file {}", spec);
- File file = new File(spec);
- if (file.getAbsoluteFile().getParentFile() != null
- && !file.getAbsoluteFile().getParentFile().exists()
- && !file.getAbsoluteFile().getParentFile().mkdirs()) {
- throw new IOException("Unable to create parent directories for '" + spec + "'");
- }
- return Channels.newChannel(
- new BufferedOutputStream(new FileOutputStream(file)));
- }
-
- @Override
- public long getSizeBytes(String spec) throws IOException {
- try {
- return Files.size(FileSystems.getDefault().getPath(spec));
- } catch (NoSuchFileException e) {
- throw new FileNotFoundException(e.getReason());
- }
- }
-
- @Override
- public boolean isReadSeekEfficient(String spec) throws IOException {
- return true;
- }
-
- @Override
- public String resolve(String path, String other) throws IOException {
- return Paths.get(path).resolve(other).toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/FinishedTriggers.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/FinishedTriggers.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/FinishedTriggers.java
deleted file mode 100644
index e75be23..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/FinishedTriggers.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-/**
- * A mutable set which tracks whether any particular {@link ExecutableTrigger} is
- * finished.
- */
-public interface FinishedTriggers {
- /**
- * Returns {@code true} if the trigger is finished.
- */
- public boolean isFinished(ExecutableTrigger<?> trigger);
-
- /**
- * Sets the fact that the trigger is finished.
- */
- public void setFinished(ExecutableTrigger<?> trigger, boolean value);
-
- /**
- * Sets the trigger and all of its subtriggers to unfinished.
- */
- public void clearRecursively(ExecutableTrigger<?> trigger);
-
- /**
- * Create an independent copy of this mutable {@link FinishedTriggers}.
- */
- public FinishedTriggers copy();
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/FinishedTriggersBitSet.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/FinishedTriggersBitSet.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/FinishedTriggersBitSet.java
deleted file mode 100644
index 09f7af7..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/FinishedTriggersBitSet.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import java.util.BitSet;
-
-/**
- * A {@link FinishedTriggers} implementation based on an underlying {@link BitSet}.
- */
-public class FinishedTriggersBitSet implements FinishedTriggers {
-
- private final BitSet bitSet;
-
- private FinishedTriggersBitSet(BitSet bitSet) {
- this.bitSet = bitSet;
- }
-
- public static FinishedTriggersBitSet emptyWithCapacity(int capacity) {
- return new FinishedTriggersBitSet(new BitSet(capacity));
- }
-
- public static FinishedTriggersBitSet fromBitSet(BitSet bitSet) {
- return new FinishedTriggersBitSet(bitSet);
- }
-
- /**
- * Returns the underlying {@link BitSet} for this {@link FinishedTriggersBitSet}.
- */
- public BitSet getBitSet() {
- return bitSet;
- }
-
- @Override
- public boolean isFinished(ExecutableTrigger<?> trigger) {
- return bitSet.get(trigger.getTriggerIndex());
- }
-
- @Override
- public void setFinished(ExecutableTrigger<?> trigger, boolean value) {
- bitSet.set(trigger.getTriggerIndex(), value);
- }
-
- @Override
- public void clearRecursively(ExecutableTrigger<?> trigger) {
- bitSet.clear(trigger.getTriggerIndex(), trigger.getFirstIndexAfterSubtree());
- }
-
- @Override
- public FinishedTriggersBitSet copy() {
- return new FinishedTriggersBitSet((BitSet) bitSet.clone());
- }
-}
-
-