You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/04/14 06:48:24 UTC

[37/74] [partial] incubator-beam git commit: Rename com/google/cloud/dataflow->org/apache/beam

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java
deleted file mode 100644
index 632be6d..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java
+++ /dev/null
@@ -1,394 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-import static com.google.cloud.dataflow.sdk.util.TimeUtil.fromCloudTime;
-
-import com.google.api.client.googleapis.json.GoogleJsonResponseException;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.NanoClock;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.dataflow.model.Job;
-import com.google.api.services.dataflow.model.JobMessage;
-import com.google.api.services.dataflow.model.JobMetrics;
-import com.google.api.services.dataflow.model.MetricUpdate;
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.runners.dataflow.DataflowAggregatorTransforms;
-import com.google.cloud.dataflow.sdk.runners.dataflow.DataflowMetricUpdateExtractor;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.util.AttemptAndTimeBoundedExponentialBackOff;
-import com.google.cloud.dataflow.sdk.util.AttemptBoundedExponentialBackOff;
-import com.google.cloud.dataflow.sdk.util.MapAggregatorValues;
-import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Throwables;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.SocketTimeoutException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
-/**
- * A DataflowPipelineJob represents a job submitted to Dataflow using
- * {@link DataflowPipelineRunner}.
- */
-public class DataflowPipelineJob implements PipelineResult {
-  private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineJob.class);
-
-  /**
-   * The id for the job.
-   */
-  private String jobId;
-
-  /**
-   * Google cloud project to associate this pipeline with.
-   */
-  private String projectId;
-
-  /**
-   * Client for the Dataflow service. This can be used to query the service
-   * for information about the job.
-   */
-  private Dataflow dataflowClient;
-
-  /**
-   * The state the job terminated in or {@code null} if the job has not terminated.
-   */
-  @Nullable
-  private State terminalState = null;
-
-  /**
-   * The job that replaced this one or {@code null} if the job has not been replaced.
-   */
-  @Nullable
-  private DataflowPipelineJob replacedByJob = null;
-
-  private DataflowAggregatorTransforms aggregatorTransforms;
-
-  /**
-   * The Metric Updates retrieved after the job was in a terminal state.
-   */
-  private List<MetricUpdate> terminalMetricUpdates;
-
-  /**
-   * The polling interval for job status and messages information.
-   */
-  static final long MESSAGES_POLLING_INTERVAL = TimeUnit.SECONDS.toMillis(2);
-  static final long STATUS_POLLING_INTERVAL = TimeUnit.SECONDS.toMillis(2);
-
-  /**
-   * The amount of polling attempts for job status and messages information.
-   */
-  static final int MESSAGES_POLLING_ATTEMPTS = 10;
-  static final int STATUS_POLLING_ATTEMPTS = 5;
-
-  /**
-   * Constructs the job.
-   *
-   * @param projectId the project id
-   * @param jobId the job id
-   * @param dataflowClient the client for the Dataflow Service
-   */
-  public DataflowPipelineJob(String projectId, String jobId, Dataflow dataflowClient,
-      DataflowAggregatorTransforms aggregatorTransforms) {
-    this.projectId = projectId;
-    this.jobId = jobId;
-    this.dataflowClient = dataflowClient;
-    this.aggregatorTransforms = aggregatorTransforms;
-  }
-
-  /**
-   * Get the id of this job.
-   */
-  public String getJobId() {
-    return jobId;
-  }
-
-  /**
-   * Get the project this job exists in.
-   */
-  public String getProjectId() {
-    return projectId;
-  }
-
-  /**
-   * Returns a new {@link DataflowPipelineJob} for the job that replaced this one, if applicable.
-   *
-   * @throws IllegalStateException if called before the job has terminated or if the job terminated
-   * but was not updated
-   */
-  public DataflowPipelineJob getReplacedByJob() {
-    if (terminalState == null) {
-      throw new IllegalStateException("getReplacedByJob() called before job terminated");
-    }
-    if (replacedByJob == null) {
-      throw new IllegalStateException("getReplacedByJob() called for job that was not replaced");
-    }
-    return replacedByJob;
-  }
-
-  /**
-   * Get the Cloud Dataflow API Client used by this job.
-   */
-  public Dataflow getDataflowClient() {
-    return dataflowClient;
-  }
-
-  /**
-   * Waits for the job to finish and return the final status.
-   *
-   * @param timeToWait The time to wait in units timeUnit for the job to finish.
-   *     Provide a value less than 1 ms for an infinite wait.
-   * @param timeUnit The unit of time for timeToWait.
-   * @param messageHandler If non null this handler will be invoked for each
-   *   batch of messages received.
-   * @return The final state of the job or null on timeout or if the
-   *   thread is interrupted.
-   * @throws IOException If there is a persistent problem getting job
-   *   information.
-   * @throws InterruptedException
-   */
-  @Nullable
-  public State waitToFinish(
-      long timeToWait,
-      TimeUnit timeUnit,
-      MonitoringUtil.JobMessagesHandler messageHandler)
-          throws IOException, InterruptedException {
-    return waitToFinish(timeToWait, timeUnit, messageHandler, Sleeper.DEFAULT, NanoClock.SYSTEM);
-  }
-
-  /**
-   * Wait for the job to finish and return the final status.
-   *
-   * @param timeToWait The time to wait in units timeUnit for the job to finish.
-   *     Provide a value less than 1 ms for an infinite wait.
-   * @param timeUnit The unit of time for timeToWait.
-   * @param messageHandler If non null this handler will be invoked for each
-   *   batch of messages received.
-   * @param sleeper A sleeper to use to sleep between attempts.
-   * @param nanoClock A nanoClock used to time the total time taken.
-   * @return The final state of the job or null on timeout or if the
-   *   thread is interrupted.
-   * @throws IOException If there is a persistent problem getting job
-   *   information.
-   * @throws InterruptedException
-   */
-  @Nullable
-  @VisibleForTesting
-  State waitToFinish(
-      long timeToWait,
-      TimeUnit timeUnit,
-      MonitoringUtil.JobMessagesHandler messageHandler,
-      Sleeper sleeper,
-      NanoClock nanoClock)
-          throws IOException, InterruptedException {
-    MonitoringUtil monitor = new MonitoringUtil(projectId, dataflowClient);
-
-    long lastTimestamp = 0;
-    BackOff backoff =
-        timeUnit.toMillis(timeToWait) > 0
-            ? new AttemptAndTimeBoundedExponentialBackOff(
-                MESSAGES_POLLING_ATTEMPTS,
-                MESSAGES_POLLING_INTERVAL,
-                timeUnit.toMillis(timeToWait),
-                AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ATTEMPTS,
-                nanoClock)
-            : new AttemptBoundedExponentialBackOff(
-                MESSAGES_POLLING_ATTEMPTS, MESSAGES_POLLING_INTERVAL);
-    State state;
-    do {
-      // Get the state of the job before listing messages. This ensures we always fetch job
-      // messages after the job finishes to ensure we have all them.
-      state = getStateWithRetries(1, sleeper);
-      boolean hasError = state == State.UNKNOWN;
-
-      if (messageHandler != null && !hasError) {
-        // Process all the job messages that have accumulated so far.
-        try {
-          List<JobMessage> allMessages = monitor.getJobMessages(
-              jobId, lastTimestamp);
-
-          if (!allMessages.isEmpty()) {
-            lastTimestamp =
-                fromCloudTime(allMessages.get(allMessages.size() - 1).getTime()).getMillis();
-            messageHandler.process(allMessages);
-          }
-        } catch (GoogleJsonResponseException | SocketTimeoutException e) {
-          hasError = true;
-          LOG.warn("There were problems getting current job messages: {}.", e.getMessage());
-          LOG.debug("Exception information:", e);
-        }
-      }
-
-      if (!hasError) {
-        backoff.reset();
-        // Check if the job is done.
-        if (state.isTerminal()) {
-          return state;
-        }
-      }
-    } while(BackOffUtils.next(sleeper, backoff));
-    LOG.warn("No terminal state was returned.  State value {}", state);
-    return null;  // Timed out.
-  }
-
-  /**
-   * Cancels the job.
-   * @throws IOException if there is a problem executing the cancel request.
-   */
-  public void cancel() throws IOException {
-    Job content = new Job();
-    content.setProjectId(projectId);
-    content.setId(jobId);
-    content.setRequestedState("JOB_STATE_CANCELLED");
-    dataflowClient.projects().jobs()
-        .update(projectId, jobId, content)
-        .execute();
-  }
-
-  @Override
-  public State getState() {
-    if (terminalState != null) {
-      return terminalState;
-    }
-
-    return getStateWithRetries(STATUS_POLLING_ATTEMPTS, Sleeper.DEFAULT);
-  }
-
-  /**
-   * Attempts to get the state. Uses exponential backoff on failure up to the maximum number
-   * of passed in attempts.
-   *
-   * @param attempts The amount of attempts to make.
-   * @param sleeper Object used to do the sleeps between attempts.
-   * @return The state of the job or State.UNKNOWN in case of failure.
-   */
-  @VisibleForTesting
-  State getStateWithRetries(int attempts, Sleeper sleeper) {
-    if (terminalState != null) {
-      return terminalState;
-    }
-    try {
-      Job job = getJobWithRetries(attempts, sleeper);
-      return MonitoringUtil.toState(job.getCurrentState());
-    } catch (IOException exn) {
-      // The only IOException that getJobWithRetries is permitted to throw is the final IOException
-      // that caused the failure of retry. Other exceptions are wrapped in an unchecked exceptions
-      // and will propagate.
-      return State.UNKNOWN;
-    }
-  }
-
-  /**
-   * Attempts to get the underlying {@link Job}. Uses exponential backoff on failure up to the
-   * maximum number of passed in attempts.
-   *
-   * @param attempts The amount of attempts to make.
-   * @param sleeper Object used to do the sleeps between attempts.
-   * @return The underlying {@link Job} object.
-   * @throws IOException When the maximum number of retries is exhausted, the last exception is
-   * thrown.
-   */
-  @VisibleForTesting
-  Job getJobWithRetries(int attempts, Sleeper sleeper) throws IOException {
-    AttemptBoundedExponentialBackOff backoff =
-        new AttemptBoundedExponentialBackOff(attempts, STATUS_POLLING_INTERVAL);
-
-    // Retry loop ends in return or throw
-    while (true) {
-      try {
-        Job job = dataflowClient
-            .projects()
-            .jobs()
-            .get(projectId, jobId)
-            .execute();
-        State currentState = MonitoringUtil.toState(job.getCurrentState());
-        if (currentState.isTerminal()) {
-          terminalState = currentState;
-          replacedByJob = new DataflowPipelineJob(
-              getProjectId(), job.getReplacedByJobId(), dataflowClient, aggregatorTransforms);
-        }
-        return job;
-      } catch (IOException exn) {
-        LOG.warn("There were problems getting current job status: {}.", exn.getMessage());
-        LOG.debug("Exception information:", exn);
-
-        if (!nextBackOff(sleeper, backoff)) {
-          throw exn;
-        }
-      }
-    }
-  }
-
-  /**
-   * Identical to {@link BackOffUtils#next} but without checked exceptions.
-   */
-  private boolean nextBackOff(Sleeper sleeper, BackOff backoff) {
-    try {
-      return BackOffUtils.next(sleeper, backoff);
-    } catch (InterruptedException | IOException e) {
-      if (e instanceof InterruptedException) {
-        Thread.currentThread().interrupt();
-      }
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @Override
-  public <OutputT> AggregatorValues<OutputT> getAggregatorValues(Aggregator<?, OutputT> aggregator)
-      throws AggregatorRetrievalException {
-    try {
-      return new MapAggregatorValues<>(fromMetricUpdates(aggregator));
-    } catch (IOException e) {
-      throw new AggregatorRetrievalException(
-          "IOException when retrieving Aggregator values for Aggregator " + aggregator, e);
-    }
-  }
-
-  private <OutputT> Map<String, OutputT> fromMetricUpdates(Aggregator<?, OutputT> aggregator)
-      throws IOException {
-    if (aggregatorTransforms.contains(aggregator)) {
-      List<MetricUpdate> metricUpdates;
-      if (terminalMetricUpdates != null) {
-        metricUpdates = terminalMetricUpdates;
-      } else {
-        boolean terminal = getState().isTerminal();
-        JobMetrics jobMetrics =
-            dataflowClient.projects().jobs().getMetrics(projectId, jobId).execute();
-        metricUpdates = jobMetrics.getMetrics();
-        if (terminal && jobMetrics.getMetrics() != null) {
-          terminalMetricUpdates = metricUpdates;
-        }
-      }
-
-      return DataflowMetricUpdateExtractor.fromMetricUpdates(
-          aggregator, aggregatorTransforms, metricUpdates);
-    } else {
-      throw new IllegalArgumentException(
-          "Aggregator " + aggregator + " is not used in this pipeline");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRegistrar.java
deleted file mode 100644
index 8aaa7cc..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRegistrar.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-import com.google.auto.service.AutoService;
-import com.google.cloud.dataflow.sdk.options.BlockingDataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar;
-import com.google.common.collect.ImmutableList;
-
-/**
- * Contains the {@link PipelineOptionsRegistrar} and {@link PipelineRunnerRegistrar} for
- * the {@link DataflowPipeline}.
- */
-public class DataflowPipelineRegistrar {
-  private DataflowPipelineRegistrar() { }
-
-  /**
-   * Register the {@link DataflowPipelineOptions} and {@link BlockingDataflowPipelineOptions}.
-   */
-  @AutoService(PipelineOptionsRegistrar.class)
-  public static class Options implements PipelineOptionsRegistrar {
-    @Override
-    public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
-      return ImmutableList.<Class<? extends PipelineOptions>>of(
-          DataflowPipelineOptions.class,
-          BlockingDataflowPipelineOptions.class);
-    }
-  }
-
-  /**
-   * Register the {@link DataflowPipelineRunner} and {@link BlockingDataflowPipelineRunner}.
-   */
-  @AutoService(PipelineRunnerRegistrar.class)
-  public static class Runner implements PipelineRunnerRegistrar {
-    @Override
-    public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
-      return ImmutableList.<Class<? extends PipelineRunner<?>>>of(
-          DataflowPipelineRunner.class,
-          BlockingDataflowPipelineRunner.class);
-    }
-  }
-}