You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/06 09:41:48 UTC

[46/50] [abbrv] tez git commit: TEZ-2284. Separate TaskReporter into an interface. (sseth)

TEZ-2284. Separate TaskReporter into an interface. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/854a4397
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/854a4397
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/854a4397

Branch: refs/heads/TEZ-2003
Commit: 854a43972d6dc1895423b4dfcb07a625d76b3db4
Parents: d99aae5
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Apr 7 13:21:35 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed May 6 00:18:08 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |  1 +
 .../internals/api/TaskReporterInterface.java    | 46 ++++++++++++++++++++
 .../apache/tez/runtime/task/TaskReporter.java   | 12 ++++-
 .../org/apache/tez/runtime/task/TezChild.java   |  3 +-
 .../apache/tez/runtime/task/TezTaskRunner.java  |  5 ++-
 5 files changed, 62 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/854a4397/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 6a4399c..e2c428d 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -13,5 +13,6 @@ ALL CHANGES:
   TEZ-2187. Allow TaskCommunicators to report failed / killed attempts.
   TEZ-2241. Miscellaneous fixes after last reabse.
   TEZ-2283. Fixes after rebase 04/07.
+  TEZ-2284. Separate TaskReporter into an interface.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/854a4397/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java
new file mode 100644
index 0000000..47a61ab
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.internals.api;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.RuntimeTask;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.task.ErrorReporter;
+
+public interface TaskReporterInterface {
+
+  // TODO TEZ-2003 Consolidate private API usage if making this public
+
+  void registerTask(RuntimeTask task, ErrorReporter errorReporter);
+
+  void unregisterTask(TezTaskAttemptID taskAttemptId);
+
+  boolean taskSucceeded(TezTaskAttemptID taskAttemptId) throws IOException, TezException;
+
+  boolean taskFailed(TezTaskAttemptID taskAttemptId, Throwable cause, String diagnostics, EventMetaData srcMeta) throws IOException,
+      TezException;
+
+  void addEvents(TezTaskAttemptID taskAttemptId, Collection<TezEvent> events);
+
+  boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException;
+
+  void shutdown();
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/854a4397/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
index 7324abd..98aa55a 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
@@ -47,6 +47,7 @@ import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
 import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.runtime.internals.api.TaskReporterInterface;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,7 +66,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
  * retrieve events specific to this task.
  * 
  */
-public class TaskReporter {
+public class TaskReporter implements TaskReporterInterface {
 
   private static final Logger LOG = LoggerFactory.getLogger(TaskReporter.class);
 
@@ -97,6 +98,7 @@ public class TaskReporter {
   /**
    * Register a task to be tracked. Heartbeats will be sent out for this task to fetch events, etc.
    */
+  @Override
   public synchronized void registerTask(RuntimeTask task,
       ErrorReporter errorReporter) {
     currentCallable = new HeartbeatCallable(task, umbilical, pollInterval, sendCounterInterval,
@@ -109,11 +111,13 @@ public class TaskReporter {
    * This method should always be invoked before setting up heartbeats for another task running in
    * the same container.
    */
+  @Override
   public synchronized void unregisterTask(TezTaskAttemptID taskAttemptID) {
     currentCallable.markComplete();
     currentCallable = null;
   }
-  
+
+  @Override
   public void shutdown() {
     heartbeatExecutor.shutdownNow();
   }
@@ -388,19 +392,23 @@ public class TaskReporter {
     }
   }
 
+  @Override
   public synchronized boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException {
     return currentCallable.taskSucceeded(taskAttemptID);
   }
 
+  @Override
   public synchronized boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics,
       EventMetaData srcMeta) throws IOException, TezException {
     return currentCallable.taskFailed(taskAttemptID, t, diagnostics, srcMeta);
   }
 
+  @Override
   public synchronized void addEvents(TezTaskAttemptID taskAttemptID, Collection<TezEvent> events) {
     currentCallable.addEvents(taskAttemptID, events);
   }
 
+  @Override
   public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException {
     return umbilical.canCommit(taskAttemptID);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/854a4397/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index 7615f08..c4fd64c 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -66,6 +66,7 @@ import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezUmbilical;
 import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
+import org.apache.tez.runtime.internals.api.TaskReporterInterface;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -111,7 +112,7 @@ public class TezChild {
   private final boolean ownUmbilical;
 
   private final TezTaskUmbilicalProtocol umbilical;
-  private TaskReporter taskReporter;
+  private TaskReporterInterface taskReporter;
   private int taskCount = 0;
   private TezVertexID lastVertexID;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/854a4397/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
index f54814b..33a7f4a 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
@@ -41,6 +41,7 @@ import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.runtime.api.impl.TezUmbilical;
+import org.apache.tez.runtime.internals.api.TaskReporterInterface;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,7 +57,7 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
   private final LogicalIOProcessorRuntimeTask task;
   private final UserGroupInformation ugi;
 
-  private final TaskReporter taskReporter;
+  private final TaskReporterInterface taskReporter;
   private final ListeningExecutorService executor;
   private volatile ListenableFuture<Void> taskFuture;
   private volatile Thread waitingThread;
@@ -70,7 +71,7 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
   public TezTaskRunner(Configuration tezConf, UserGroupInformation ugi, String[] localDirs,
       TaskSpec taskSpec, int appAttemptNumber,
       Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> serviceProviderEnvMap,
-      Multimap<String, String> startedInputsMap, TaskReporter taskReporter,
+      Multimap<String, String> startedInputsMap, TaskReporterInterface taskReporter,
       ListeningExecutorService executor, ObjectRegistry objectRegistry, String pid,
       ExecutionContext executionContext, long memAvailable)
           throws IOException {