You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/06 09:41:48 UTC
[46/50] [abbrv] tez git commit: TEZ-2284. Separate TaskReporter into
an interface. (sseth)
TEZ-2284. Separate TaskReporter into an interface. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/854a4397
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/854a4397
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/854a4397
Branch: refs/heads/TEZ-2003
Commit: 854a43972d6dc1895423b4dfcb07a625d76b3db4
Parents: d99aae5
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Apr 7 13:21:35 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed May 6 00:18:08 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
.../internals/api/TaskReporterInterface.java | 46 ++++++++++++++++++++
.../apache/tez/runtime/task/TaskReporter.java | 12 ++++-
.../org/apache/tez/runtime/task/TezChild.java | 3 +-
.../apache/tez/runtime/task/TezTaskRunner.java | 5 ++-
5 files changed, 62 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/854a4397/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 6a4399c..e2c428d 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -13,5 +13,6 @@ ALL CHANGES:
TEZ-2187. Allow TaskCommunicators to report failed / killed attempts.
TEZ-2241. Miscellaneous fixes after last reabse.
TEZ-2283. Fixes after rebase 04/07.
+ TEZ-2284. Separate TaskReporter into an interface.
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/854a4397/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java
new file mode 100644
index 0000000..47a61ab
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.internals.api;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.RuntimeTask;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.task.ErrorReporter;
+
+public interface TaskReporterInterface {
+
+ // TODO TEZ-2003 Consolidate private API usage if making this public
+
+ void registerTask(RuntimeTask task, ErrorReporter errorReporter);
+
+ void unregisterTask(TezTaskAttemptID taskAttemptId);
+
+ boolean taskSucceeded(TezTaskAttemptID taskAttemptId) throws IOException, TezException;
+
+ boolean taskFailed(TezTaskAttemptID taskAttemptId, Throwable cause, String diagnostics, EventMetaData srcMeta) throws IOException,
+ TezException;
+
+ void addEvents(TezTaskAttemptID taskAttemptId, Collection<TezEvent> events);
+
+ boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException;
+
+ void shutdown();
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/854a4397/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
index 7324abd..98aa55a 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
@@ -47,6 +47,7 @@ import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.runtime.internals.api.TaskReporterInterface;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,7 +66,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
* retrieve events specific to this task.
*
*/
-public class TaskReporter {
+public class TaskReporter implements TaskReporterInterface {
private static final Logger LOG = LoggerFactory.getLogger(TaskReporter.class);
@@ -97,6 +98,7 @@ public class TaskReporter {
/**
* Register a task to be tracked. Heartbeats will be sent out for this task to fetch events, etc.
*/
+ @Override
public synchronized void registerTask(RuntimeTask task,
ErrorReporter errorReporter) {
currentCallable = new HeartbeatCallable(task, umbilical, pollInterval, sendCounterInterval,
@@ -109,11 +111,13 @@ public class TaskReporter {
* This method should always be invoked before setting up heartbeats for another task running in
* the same container.
*/
+ @Override
public synchronized void unregisterTask(TezTaskAttemptID taskAttemptID) {
currentCallable.markComplete();
currentCallable = null;
}
-
+
+ @Override
public void shutdown() {
heartbeatExecutor.shutdownNow();
}
@@ -388,19 +392,23 @@ public class TaskReporter {
}
}
+ @Override
public synchronized boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException {
return currentCallable.taskSucceeded(taskAttemptID);
}
+ @Override
public synchronized boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics,
EventMetaData srcMeta) throws IOException, TezException {
return currentCallable.taskFailed(taskAttemptID, t, diagnostics, srcMeta);
}
+ @Override
public synchronized void addEvents(TezTaskAttemptID taskAttemptID, Collection<TezEvent> events) {
currentCallable.addEvents(taskAttemptID, events);
}
+ @Override
public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException {
return umbilical.canCommit(taskAttemptID);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/854a4397/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index 7615f08..c4fd64c 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -66,6 +66,7 @@ import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezUmbilical;
import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
+import org.apache.tez.runtime.internals.api.TaskReporterInterface;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -111,7 +112,7 @@ public class TezChild {
private final boolean ownUmbilical;
private final TezTaskUmbilicalProtocol umbilical;
- private TaskReporter taskReporter;
+ private TaskReporterInterface taskReporter;
private int taskCount = 0;
private TezVertexID lastVertexID;
http://git-wip-us.apache.org/repos/asf/tez/blob/854a4397/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
index f54814b..33a7f4a 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
@@ -41,6 +41,7 @@ import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.TezUmbilical;
+import org.apache.tez.runtime.internals.api.TaskReporterInterface;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,7 +57,7 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
private final LogicalIOProcessorRuntimeTask task;
private final UserGroupInformation ugi;
- private final TaskReporter taskReporter;
+ private final TaskReporterInterface taskReporter;
private final ListeningExecutorService executor;
private volatile ListenableFuture<Void> taskFuture;
private volatile Thread waitingThread;
@@ -70,7 +71,7 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
public TezTaskRunner(Configuration tezConf, UserGroupInformation ugi, String[] localDirs,
TaskSpec taskSpec, int appAttemptNumber,
Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> serviceProviderEnvMap,
- Multimap<String, String> startedInputsMap, TaskReporter taskReporter,
+ Multimap<String, String> startedInputsMap, TaskReporterInterface taskReporter,
ListeningExecutorService executor, ObjectRegistry objectRegistry, String pid,
ExecutionContext executionContext, long memAvailable)
throws IOException {