You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/09/25 00:44:18 UTC
[09/20] Rename tez-engine-api to tez-runtime-api and tez-engine is
split into 2: - tez-engine-library for user-visible Input/Output/Processor
implementations - tez-engine-internals for framework internals
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java
index e5bd108..12a3740 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java
@@ -21,8 +21,8 @@ package org.apache.tez.mapreduce.hadoop;
import static org.junit.Assert.assertEquals;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.tez.common.Constants;
import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.runtime.library.common.Constants;
import org.junit.Test;
public class TestDeprecatedKeys {
@@ -43,22 +43,22 @@ public class TestDeprecatedKeys {
MultiStageMRConfToTezTranslator.translateVertexConfToTez(jobConf, null);
assertEquals(0.4f, jobConf.getFloat(
- TezJobConfig.TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT, 0f), 0.01f);
- assertEquals(20000l, jobConf.getLong(Constants.TEZ_ENGINE_TASK_MEMORY, 0));
+ TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 0f), 0.01f);
+ assertEquals(20000l, jobConf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY, 0));
assertEquals(2000,
- jobConf.getInt(TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR, 0));
+ jobConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR, 0));
assertEquals(0.55f, jobConf.getFloat(
- TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT, 0), 0.01f);
+ TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, 0), 0.01f);
assertEquals(0.60f,
- jobConf.getFloat(TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMTOMEM_SEGMENTS, 0),
+ jobConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS, 0),
0.01f);
assertEquals(0.22f,
- jobConf.getFloat(TezJobConfig.TEZ_ENGINE_SHUFFLE_MERGE_PERCENT, 0),
+ jobConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, 0),
0.01f);
assertEquals(true, jobConf.getBoolean(
- TezJobConfig.TEZ_ENGINE_SHUFFLE_ENABLE_MEMTOMEM, false));
+ TezJobConfig.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM, false));
assertEquals(0.33f,
- jobConf.getFloat(TezJobConfig.TEZ_ENGINE_INPUT_BUFFER_PERCENT, 0),
+ jobConf.getFloat(TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT, 0),
0.01f);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
index 4b2c0e8..9590e72 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
@@ -48,15 +48,15 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.engine.api.impl.InputSpec;
-import org.apache.tez.engine.api.impl.OutputSpec;
-import org.apache.tez.engine.api.impl.TaskSpec;
-import org.apache.tez.engine.api.impl.TezUmbilical;
-import org.apache.tez.engine.common.security.JobTokenIdentifier;
-import org.apache.tez.engine.newruntime.LogicalIOProcessorRuntimeTask;
import org.apache.tez.mapreduce.TezTestUtils;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.processor.map.MapProcessor;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezUmbilical;
+import org.apache.tez.runtime.library.common.security.JobTokenIdentifier;
public class MapUtils {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index 89292ab..5b8eedf 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -31,19 +31,9 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.tez.common.Constants;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
-import org.apache.tez.engine.api.TezInputContext;
-import org.apache.tez.engine.api.impl.InputSpec;
-import org.apache.tez.engine.api.impl.OutputSpec;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-import org.apache.tez.engine.common.sort.impl.IFile;
-import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
-import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput;
-import org.apache.tez.engine.newruntime.LogicalIOProcessorRuntimeTask;
import org.apache.tez.mapreduce.TestUmbilical;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
@@ -51,6 +41,16 @@ import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.partition.MRPartitioner;
import org.apache.tez.mapreduce.processor.MapUtils;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.task.local.output.TezLocalTaskOutputFiles;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
+import org.apache.tez.runtime.library.output.LocalOnFileSorterOutput;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -85,10 +85,10 @@ public class TestMapProcessor {
job.set(TezJobConfig.LOCAL_DIRS, workDir.toString());
job.set(MRConfig.LOCAL_DIR, workDir.toString());
job.setClass(
- Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
+ Constants.TEZ_RUNTIME_TASK_OUTPUT_MANAGER,
TezLocalTaskOutputFiles.class,
TezTaskOutput.class);
- job.set(TezJobConfig.TEZ_ENGINE_PARTITIONER_CLASS, MRPartitioner.class.getName());
+ job.set(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS, MRPartitioner.class.getName());
job.setNumReduceTasks(1);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index 274c353..d2c7952 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -33,21 +33,11 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.security.token.Token;
-import org.apache.tez.common.Constants;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.engine.api.impl.InputSpec;
-import org.apache.tez.engine.api.impl.OutputSpec;
-import org.apache.tez.engine.api.impl.TaskSpec;
-import org.apache.tez.engine.common.security.JobTokenIdentifier;
-import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
-import org.apache.tez.engine.lib.input.LocalMergedInput;
-import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput;
-import org.apache.tez.engine.newruntime.LogicalIOProcessorRuntimeTask;
import org.apache.tez.mapreduce.TestUmbilical;
import org.apache.tez.mapreduce.TezTestUtils;
import org.apache.tez.mapreduce.hadoop.IDConverter;
@@ -58,6 +48,16 @@ import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.partition.MRPartitioner;
import org.apache.tez.mapreduce.processor.MapUtils;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.security.JobTokenIdentifier;
+import org.apache.tez.runtime.library.common.task.local.output.TezLocalTaskOutputFiles;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
+import org.apache.tez.runtime.library.input.LocalMergedInput;
+import org.apache.tez.runtime.library.output.LocalOnFileSorterOutput;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -90,10 +90,10 @@ public class TestReduceProcessor {
job.set(TezJobConfig.LOCAL_DIRS, workDir.toString());
job.set(MRConfig.LOCAL_DIR, workDir.toString());
job.setClass(
- Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
+ Constants.TEZ_RUNTIME_TASK_OUTPUT_MANAGER,
TezLocalTaskOutputFiles.class,
TezTaskOutput.class);
- job.set(TezJobConfig.TEZ_ENGINE_PARTITIONER_CLASS, MRPartitioner.class.getName());
+ job.set(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS, MRPartitioner.class.getName());
job.setNumReduceTasks(1);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/findbugs-exclude.xml b/tez-runtime-internals/findbugs-exclude.xml
new file mode 100644
index 0000000..5b11308
--- /dev/null
+++ b/tez-runtime-internals/findbugs-exclude.xml
@@ -0,0 +1,16 @@
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+<FindBugsFilter>
+
+</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/pom.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/pom.xml b/tez-runtime-internals/pom.xml
new file mode 100644
index 0000000..4f64701
--- /dev/null
+++ b/tez-runtime-internals/pom.xml
@@ -0,0 +1,95 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez</artifactId>
+ <version>0.2.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>tez-runtime-internals</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-runtime-library</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.inject</groupId>
+ <artifactId>guice</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <configuration>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-maven-plugins</artifactId>
+ <executions>
+ <execution>
+ <id>compile-protoc</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>protoc</goal>
+ </goals>
+ <configuration>
+ <protocVersion>${protobuf.version}</protocVersion>
+ <protocCommand>${protoc.path}</protocCommand>
+ <imports>
+ <param>${basedir}/src/main/proto</param>
+ </imports>
+ <source>
+ <directory>${basedir}/src/main/proto</directory>
+ <includes>
+ <include>Events.proto</include>
+ </includes>
+ </source>
+ <output>${project.build.directory}/generated-sources/java</output>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/common/ContainerContext.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/common/ContainerContext.java b/tez-runtime-internals/src/main/java/org/apache/tez/common/ContainerContext.java
new file mode 100644
index 0000000..df92bdc
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/common/ContainerContext.java
@@ -0,0 +1,64 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.common;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+// TODO EVENTUALLY move this over to PB. Fix package/module.
+// TODO EVENTUALLY unit tests for functionality.
+public class ContainerContext implements Writable {
+
+ String containerIdentifier;
+ String pid;
+
+ public ContainerContext() {
+ containerIdentifier = "";
+ pid = "";
+ }
+
+ public ContainerContext(String containerIdStr, String pid) {
+ this.containerIdentifier = containerIdStr;
+ this.pid = pid;
+ }
+
+ public String getContainerIdentifier() {
+ return containerIdentifier;
+ }
+
+ public String getPid() {
+ return pid;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.containerIdentifier = Text.readString(in);
+ this.pid = Text.readString(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, containerIdentifier);
+ Text.writeString(out, pid);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/common/ContainerTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/common/ContainerTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/common/ContainerTask.java
new file mode 100644
index 0000000..c865631
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/common/ContainerTask.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.common;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+
+public class ContainerTask implements Writable {
+
+ TaskSpec taskSpec;
+ boolean shouldDie;
+
+ public ContainerTask() {
+ }
+
+ public ContainerTask(TaskSpec taskSpec, boolean shouldDie) {
+ this.taskSpec = taskSpec;
+ this.shouldDie = shouldDie;
+ }
+
+ public TaskSpec getTaskSpec() {
+ return taskSpec;
+ }
+
+ public boolean shouldDie() {
+ return shouldDie;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeBoolean(shouldDie);
+ if (taskSpec != null) {
+ out.writeBoolean(true);
+ taskSpec.write(out);
+ } else {
+ out.writeBoolean(false);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ shouldDie = in.readBoolean();
+ boolean taskComing = in.readBoolean();
+ if (taskComing) {
+ taskSpec = new TaskSpec();
+ taskSpec.readFields(in);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "shouldDie: " + shouldDie + ", TaskSpec: "
+ + taskSpec;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java b/tez-runtime-internals/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
new file mode 100644
index 0000000..1966790
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.tez.common.records.ProceedToCompletionResponse;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
+import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
+
+/** Protocol that task child process uses to contact its parent process. The
+ * parent is a daemon which which polls the central master for a new map or
+ * reduce task and runs it as a child process. All communication between child
+ * and parent is via this protocol. */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+@ProtocolInfo(protocolName = "TezTaskUmbilicalProtocol", protocolVersion = 1)
+public interface TezTaskUmbilicalProtocol extends VersionedProtocol {
+
+ public static final long versionID = 19L;
+
+ ContainerTask getTask(ContainerContext containerContext) throws IOException;
+
+ boolean canCommit(TezTaskAttemptID taskid) throws IOException;
+
+ ProceedToCompletionResponse
+ proceedToCompletion(TezTaskAttemptID taskAttemptId) throws IOException;
+
+ /// Copies from TezUmbilical until complete re-factor is done
+ // TODONEWTEZ
+
+ public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request)
+ throws IOException, TezException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
new file mode 100644
index 0000000..8aff6d1
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -0,0 +1,475 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.LogicalIOProcessor;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.Output;
+import org.apache.tez.runtime.api.Processor;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezInputContextImpl;
+import org.apache.tez.runtime.api.impl.TezOutputContextImpl;
+import org.apache.tez.runtime.api.impl.TezProcessorContextImpl;
+import org.apache.tez.runtime.api.impl.TezUmbilical;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.runtime.library.common.security.JobTokenIdentifier;
+import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+@Private
+public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
+
+ private static final Log LOG = LogFactory
+ .getLog(LogicalIOProcessorRuntimeTask.class);
+
+ private final List<InputSpec> inputSpecs;
+ private final List<LogicalInput> inputs;
+
+ private final List<OutputSpec> outputSpecs;
+ private final List<LogicalOutput> outputs;
+
+ private List<TezInputContext> inputContexts;
+ private List<TezOutputContext> outputContexts;
+ private TezProcessorContext processorContext;
+
+ private final ProcessorDescriptor processorDescriptor;
+ private final LogicalIOProcessor processor;
+
+ private final Map<String, ByteBuffer> serviceConsumerMetadata;
+
+ private Map<String, LogicalInput> inputMap;
+ private Map<String, LogicalOutput> outputMap;
+
+ private LinkedBlockingQueue<TezEvent> eventsToBeProcessed;
+ private Thread eventRouterThread = null;
+
+ private final int appAttemptNumber;
+
+ public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber,
+ Configuration tezConf, TezUmbilical tezUmbilical,
+ Token<JobTokenIdentifier> jobToken) throws IOException {
+ // TODO Remove jobToken from here post TEZ-421
+ super(taskSpec, tezConf, tezUmbilical);
+ LOG.info("Initializing LogicalIOProcessorRuntimeTask with TaskSpec: "
+ + taskSpec);
+ this.inputContexts = new ArrayList<TezInputContext>(taskSpec.getInputs().size());
+ this.outputContexts = new ArrayList<TezOutputContext>(taskSpec.getOutputs().size());
+ this.inputSpecs = taskSpec.getInputs();
+ this.inputs = createInputs(inputSpecs);
+ this.outputSpecs = taskSpec.getOutputs();
+ this.outputs = createOutputs(outputSpecs);
+ this.processorDescriptor = taskSpec.getProcessorDescriptor();
+ this.processor = createProcessor(processorDescriptor);
+ this.serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
+ this.serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,
+ ShuffleUtils.convertJobTokenToBytes(jobToken));
+ this.eventsToBeProcessed = new LinkedBlockingQueue<TezEvent>();
+ this.state = State.NEW;
+ this.appAttemptNumber = appAttemptNumber;
+ }
+
+ public void initialize() throws Exception {
+ LOG.info("Initializing LogicalProcessorIORuntimeTask");
+ Preconditions.checkState(this.state == State.NEW, "Already initialized");
+ this.state = State.INITED;
+ inputMap = new LinkedHashMap<String, LogicalInput>(inputs.size());
+ outputMap = new LinkedHashMap<String, LogicalOutput>(outputs.size());
+
+ // TODO Maybe close initialized inputs / outputs in case of failure to
+ // initialize.
+ // Initialize all inputs. TODO: Multi-threaded at some point.
+ for (int i = 0; i < inputs.size(); i++) {
+ String srcVertexName = inputSpecs.get(i).getSourceVertexName();
+ initializeInput(inputs.get(i),
+ inputSpecs.get(i));
+ inputMap.put(srcVertexName, inputs.get(i));
+ }
+
+ // Initialize all outputs. TODO: Multi-threaded at some point.
+ for (int i = 0; i < outputs.size(); i++) {
+ String destVertexName = outputSpecs.get(i).getDestinationVertexName();
+ initializeOutput(outputs.get(i), outputSpecs.get(i));
+ outputMap.put(destVertexName, outputs.get(i));
+ }
+
+ // Initialize processor.
+ initializeLogicalIOProcessor();
+ startRouterThread();
+ }
+
+ public void run() throws Exception {
+ synchronized (this.state) {
+ Preconditions.checkState(this.state == State.INITED,
+ "Can only run while in INITED state. Current: " + this.state);
+ this.state = State.RUNNING;
+ }
+ LogicalIOProcessor lioProcessor = (LogicalIOProcessor) processor;
+ lioProcessor.run(inputMap, outputMap);
+ }
+
+ public void close() throws Exception {
+ try {
+ Preconditions.checkState(this.state == State.RUNNING,
+ "Can only run while in RUNNING state. Current: " + this.state);
+ this.state = State.CLOSED;
+
+ // Close the Inputs.
+ for (int i = 0; i < inputs.size(); i++) {
+ String srcVertexName = inputSpecs.get(i).getSourceVertexName();
+ List<Event> closeInputEvents = inputs.get(i).close();
+ sendTaskGeneratedEvents(closeInputEvents,
+ EventProducerConsumerType.INPUT, taskSpec.getVertexName(),
+ srcVertexName, taskSpec.getTaskAttemptID());
+ }
+
+ // Close the Processor.
+ processor.close();
+
+ // Close the Outputs.
+ for (int i = 0; i < outputs.size(); i++) {
+ String destVertexName = outputSpecs.get(i).getDestinationVertexName();
+ List<Event> closeOutputEvents = outputs.get(i).close();
+ sendTaskGeneratedEvents(closeOutputEvents,
+ EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(),
+ destVertexName, taskSpec.getTaskAttemptID());
+ }
+ } finally {
+ setTaskDone();
+ if (eventRouterThread != null) {
+ eventRouterThread.interrupt();
+ }
+ }
+ }
+
+ private void initializeInput(Input input, InputSpec inputSpec)
+ throws Exception {
+ TezInputContext tezInputContext = createInputContext(inputSpec);
+ inputContexts.add(tezInputContext);
+ if (input instanceof LogicalInput) {
+ ((LogicalInput) input).setNumPhysicalInputs(inputSpec
+ .getPhysicalEdgeCount());
+ }
+ LOG.info("Initializing Input using InputSpec: " + inputSpec);
+ List<Event> events = input.initialize(tezInputContext);
+ sendTaskGeneratedEvents(events, EventProducerConsumerType.INPUT,
+ tezInputContext.getTaskVertexName(),
+ tezInputContext.getSourceVertexName(), taskSpec.getTaskAttemptID());
+ }
+
+ private void initializeOutput(Output output, OutputSpec outputSpec)
+ throws Exception {
+ TezOutputContext tezOutputContext = createOutputContext(outputSpec);
+ outputContexts.add(tezOutputContext);
+ if (output instanceof LogicalOutput) {
+ ((LogicalOutput) output).setNumPhysicalOutputs(outputSpec
+ .getPhysicalEdgeCount());
+ }
+ LOG.info("Initializing Output using OutputSpec: " + outputSpec);
+ List<Event> events = output.initialize(tezOutputContext);
+ sendTaskGeneratedEvents(events, EventProducerConsumerType.OUTPUT,
+ tezOutputContext.getTaskVertexName(),
+ tezOutputContext.getDestinationVertexName(),
+ taskSpec.getTaskAttemptID());
+ }
+
+ private void initializeLogicalIOProcessor() throws Exception {
+ LOG.info("Initializing processor"
+ + ", processorClassName=" + processorDescriptor.getClassName());
+ TezProcessorContext processorContext = createProcessorContext();
+ this.processorContext = processorContext;
+ processor.initialize(processorContext);
+ }
+
+ private TezInputContext createInputContext(InputSpec inputSpec) {
+ TezInputContext inputContext = new TezInputContextImpl(tezConf,
+ appAttemptNumber, tezUmbilical, taskSpec.getVertexName(),
+ inputSpec.getSourceVertexName(), taskSpec.getTaskAttemptID(),
+ tezCounters,
+ inputSpec.getInputDescriptor().getUserPayload() == null ? taskSpec
+ .getProcessorDescriptor().getUserPayload() : inputSpec
+ .getInputDescriptor().getUserPayload(), this,
+ serviceConsumerMetadata);
+ return inputContext;
+ }
+
+ private TezOutputContext createOutputContext(OutputSpec outputSpec) {
+ TezOutputContext outputContext = new TezOutputContextImpl(tezConf,
+ appAttemptNumber, tezUmbilical, taskSpec.getVertexName(),
+ outputSpec.getDestinationVertexName(), taskSpec.getTaskAttemptID(),
+ tezCounters,
+ outputSpec.getOutputDescriptor().getUserPayload() == null ? taskSpec
+ .getProcessorDescriptor().getUserPayload() : outputSpec
+ .getOutputDescriptor().getUserPayload(), this,
+ serviceConsumerMetadata);
+ return outputContext;
+ }
+
+ private TezProcessorContext createProcessorContext() {
+ TezProcessorContext processorContext = new TezProcessorContextImpl(tezConf,
+ appAttemptNumber, tezUmbilical, taskSpec.getVertexName(), taskSpec.getTaskAttemptID(),
+ tezCounters, processorDescriptor.getUserPayload(), this,
+ serviceConsumerMetadata);
+ return processorContext;
+ }
+
+ private List<LogicalInput> createInputs(List<InputSpec> inputSpecs) {
+ List<LogicalInput> inputs = new ArrayList<LogicalInput>(inputSpecs.size());
+ for (InputSpec inputSpec : inputSpecs) {
+ LOG.info("Creating Input from InputSpec: "
+ + inputSpec);
+ Input input = RuntimeUtils.createClazzInstance(inputSpec
+ .getInputDescriptor().getClassName());
+
+ if (input instanceof LogicalInput) {
+ inputs.add((LogicalInput) input);
+ } else {
+ throw new TezUncheckedException(input.getClass().getName()
+ + " is not a sub-type of LogicalInput."
+ + " Only LogicalInput sub-types supported by LogicalIOProcessor.");
+ }
+ }
+ return inputs;
+ }
+
+ private List<LogicalOutput> createOutputs(List<OutputSpec> outputSpecs) {
+ List<LogicalOutput> outputs = new ArrayList<LogicalOutput>(
+ outputSpecs.size());
+ for (OutputSpec outputSpec : outputSpecs) {
+ LOG.info("Creating Output from OutputSpec"
+ + outputSpec);
+ Output output = RuntimeUtils.createClazzInstance(outputSpec
+ .getOutputDescriptor().getClassName());
+ if (output instanceof LogicalOutput) {
+ outputs.add((LogicalOutput) output);
+ } else {
+ throw new TezUncheckedException(output.getClass().getName()
+ + " is not a sub-type of LogicalOutput."
+ + " Only LogicalOutput sub-types supported by LogicalIOProcessor.");
+ }
+ }
+ return outputs;
+ }
+
+ private LogicalIOProcessor createProcessor(
+ ProcessorDescriptor processorDescriptor) {
+ Processor processor = RuntimeUtils.createClazzInstance(processorDescriptor
+ .getClassName());
+ if (!(processor instanceof LogicalIOProcessor)) {
+ throw new TezUncheckedException(processor.getClass().getName()
+ + " is not a sub-type of LogicalIOProcessor."
+ + " Only LogicalOutput sub-types supported by LogicalIOProcessor.");
+ }
+ return (LogicalIOProcessor) processor;
+ }
+
+ private void sendTaskGeneratedEvents(List<Event> events,
+ EventProducerConsumerType generator, String taskVertexName,
+ String edgeVertexName, TezTaskAttemptID taskAttemptID) {
+ if (events == null || events.isEmpty()) {
+ return;
+ }
+ EventMetaData eventMetaData = new EventMetaData(generator,
+ taskVertexName, edgeVertexName, taskAttemptID);
+ List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
+ for (Event e : events) {
+ TezEvent te = new TezEvent(e, eventMetaData);
+ tezEvents.add(te);
+ }
+ if (LOG.isDebugEnabled()) {
+ for (TezEvent e : tezEvents) {
+ LOG.debug("Generated event info"
+ + ", eventMetaData=" + eventMetaData.toString()
+ + ", eventType=" + e.getEventType());
+ }
+ }
+ tezUmbilical.addEvents(tezEvents);
+ }
+
+ private boolean handleEvent(TezEvent e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Handling TezEvent in task"
+ + ", taskAttemptId=" + taskSpec.getTaskAttemptID()
+ + ", eventType=" + e.getEventType()
+ + ", eventSourceInfo=" + e.getSourceInfo()
+ + ", eventDestinationInfo=" + e.getDestinationInfo());
+ }
+ try {
+ switch (e.getDestinationInfo().getEventGenerator()) {
+ case INPUT:
+ LogicalInput input = inputMap.get(
+ e.getDestinationInfo().getEdgeVertexName());
+ if (input != null) {
+ input.handleEvents(Collections.singletonList(e.getEvent()));
+ } else {
+ throw new TezUncheckedException("Unhandled event for invalid target: "
+ + e);
+ }
+ break;
+ case OUTPUT:
+ LogicalOutput output = outputMap.get(
+ e.getDestinationInfo().getEdgeVertexName());
+ if (output != null) {
+ output.handleEvents(Collections.singletonList(e.getEvent()));
+ } else {
+ throw new TezUncheckedException("Unhandled event for invalid target: "
+ + e);
+ }
+ break;
+ case PROCESSOR:
+ processor.handleEvents(Collections.singletonList(e.getEvent()));
+ break;
+ case SYSTEM:
+ LOG.warn("Trying to send a System event in a Task: " + e);
+ break;
+ }
+ } catch (Throwable t) {
+ LOG.warn("Failed to handle event", t);
+ setFatalError(t, "Failed to handle event");
+ EventMetaData sourceInfo = new EventMetaData(
+ e.getDestinationInfo().getEventGenerator(),
+ taskSpec.getVertexName(), e.getDestinationInfo().getEdgeVertexName(),
+ getTaskAttemptID());
+ tezUmbilical.signalFatalError(getTaskAttemptID(),
+ StringUtils.stringifyException(t), sourceInfo);
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public synchronized void handleEvents(Collection<TezEvent> events) {
+ if (events == null || events.isEmpty()) {
+ return;
+ }
+ eventCounter.addAndGet(events.size());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received events to be processed by task"
+ + ", taskAttemptId=" + taskSpec.getTaskAttemptID()
+ + ", eventCount=" + events.size()
+ + ", newEventCounter=" + eventCounter.get());
+ }
+ eventsToBeProcessed.addAll(events);
+ }
+
+ private void startRouterThread() {
+ eventRouterThread = new Thread(new Runnable() {
+ public void run() {
+ while (!isTaskDone() && !Thread.currentThread().isInterrupted()) {
+ try {
+ TezEvent e = eventsToBeProcessed.take();
+ if (e == null) {
+ continue;
+ }
+ // TODO TODONEWTEZ
+ if (!handleEvent(e)) {
+ LOG.warn("Stopping Event Router thread as failed to handle"
+ + " event: " + e);
+ return;
+ }
+ } catch (InterruptedException e) {
+ if (!isTaskDone()) {
+ LOG.warn("Event Router thread interrupted. Returning.");
+ }
+ return;
+ }
+ }
+ }
+ });
+
+ eventRouterThread.setName("TezTaskEventRouter["
+ + taskSpec.getTaskAttemptID().toString() + "]");
+ eventRouterThread.start();
+ }
+
+ public synchronized void cleanup() {
+ setTaskDone();
+ if (eventRouterThread != null) {
+ eventRouterThread.interrupt();
+ }
+ }
+
+ @Private
+ @VisibleForTesting
+ public List<TezInputContext> getInputContexts() {
+ return this.inputContexts;
+ }
+
+ @Private
+ @VisibleForTesting
+ public List<TezOutputContext> getOutputContexts() {
+ return this.outputContexts;
+ }
+
+ @Private
+ @VisibleForTesting
+ public TezProcessorContext getProcessorContext() {
+ return this.processorContext;
+ }
+
+ @Private
+ @VisibleForTesting
+ public Map<String, LogicalInput> getInputs() {
+ return this.inputMap;
+ }
+
+ @Private
+ @VisibleForTesting
+ public Map<String, LogicalOutput> getOutputs() {
+ return this.outputMap;
+ }
+
+ @Private
+ @VisibleForTesting
+ public LogicalIOProcessor getProcessor() {
+ return this.processor;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
new file mode 100644
index 0000000..f018333
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime;
+
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezUmbilical;
+
+public abstract class RuntimeTask {
+
+ protected AtomicBoolean hasFatalError = new AtomicBoolean(false);
+ protected Throwable fatalError = null;
+ protected String fatalErrorMessage = null;
+ protected float progress;
+ protected final TezCounters tezCounters;
+ protected final TaskSpec taskSpec;
+ protected final Configuration tezConf;
+ protected final TezUmbilical tezUmbilical;
+ protected final AtomicInteger eventCounter;
+ private final AtomicBoolean taskDone;
+
+ protected RuntimeTask(TaskSpec taskSpec, Configuration tezConf,
+ TezUmbilical tezUmbilical) {
+ this.taskSpec = taskSpec;
+ this.tezConf = tezConf;
+ this.tezUmbilical = tezUmbilical;
+ this.tezCounters = new TezCounters();
+ this.eventCounter = new AtomicInteger(0);
+ this.progress = 0.0f;
+ this.taskDone = new AtomicBoolean(false);
+ }
+
+ protected enum State {
+ NEW, INITED, RUNNING, CLOSED;
+ }
+
+ protected State state;
+
+ public String getVertexName() {
+ return taskSpec.getVertexName();
+ }
+
+ public void setFatalError(Throwable t, String message) {
+ hasFatalError.set(true);
+ this.fatalError = t;
+ this.fatalErrorMessage = message;
+ }
+
+ public boolean hadFatalError() {
+ return hasFatalError.get();
+ }
+
+ public synchronized void setProgress(float progress) {
+ this.progress = progress;
+ }
+
+ public synchronized float getProgress() {
+ return this.progress;
+ }
+
+ public TezCounters getCounters() {
+ return this.tezCounters;
+ }
+
+ public TezTaskAttemptID getTaskAttemptID() {
+ return taskSpec.getTaskAttemptID();
+ }
+
+ public abstract void handleEvents(Collection<TezEvent> events);
+
+ public int getEventCounter() {
+ return eventCounter.get();
+ }
+
+ public boolean isTaskDone() {
+ return taskDone.get();
+ }
+
+ protected void setTaskDone() {
+ taskDone.set(true);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeUtils.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeUtils.java
new file mode 100644
index 0000000..8a1b550
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeUtils.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.tez.dag.api.TezUncheckedException;
+
+public class RuntimeUtils {
+
+ private static final Map<String, Class<?>> CLAZZ_CACHE = new ConcurrentHashMap<String, Class<?>>();
+
+ private static Class<?> getClazz(String className) {
+ Class<?> clazz = CLAZZ_CACHE.get(className);
+ if (clazz == null) {
+ try {
+ clazz = Class.forName(className);
+ } catch (ClassNotFoundException e) {
+ throw new TezUncheckedException("Unable to load class: " + className, e);
+ }
+ }
+ return clazz;
+ }
+
+ private static <T> T getNewInstance(Class<T> clazz) {
+ T instance;
+ try {
+ instance = clazz.newInstance();
+ } catch (InstantiationException e) {
+ throw new TezUncheckedException(
+ "Unable to instantiate class with 0 arguments: " + clazz.getName(), e);
+ } catch (IllegalAccessException e) {
+ throw new TezUncheckedException(
+ "Unable to instantiate class with 0 arguments: " + clazz.getName(), e);
+ }
+ return instance;
+ }
+
+ public static <T> T createClazzInstance(String className) {
+ Class<?> clazz = getClazz(className);
+ @SuppressWarnings("unchecked")
+ T instance = (T) getNewInstance(clazz);
+ return instance;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskAttemptCompletedEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskAttemptCompletedEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskAttemptCompletedEvent.java
new file mode 100644
index 0000000..597718f
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskAttemptCompletedEvent.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.events;
+
+import org.apache.tez.runtime.api.Event;
+
+public class TaskAttemptCompletedEvent extends Event {
+
+ public TaskAttemptCompletedEvent() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskAttemptFailedEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskAttemptFailedEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskAttemptFailedEvent.java
new file mode 100644
index 0000000..935fdbb
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskAttemptFailedEvent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.events;
+
+import org.apache.tez.runtime.api.Event;
+
+public class TaskAttemptFailedEvent extends Event {
+
+ private final String diagnostics;
+
+ public TaskAttemptFailedEvent(String diagnostics) {
+ this.diagnostics = diagnostics;
+ }
+
+ public String getDiagnostics() {
+ return diagnostics;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java
new file mode 100644
index 0000000..47c2998
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.events;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.runtime.api.Event;
+
+public class TaskStatusUpdateEvent extends Event implements Writable {
+
+ private TezCounters tezCounters;
+ private float progress;
+
+ public TaskStatusUpdateEvent() {
+ }
+
+ public TaskStatusUpdateEvent(TezCounters tezCounters, float progress) {
+ this.tezCounters = tezCounters;
+ this.progress = progress;
+ }
+
+ public TezCounters getCounters() {
+ return tezCounters;
+ }
+
+ public float getProgress() {
+ return progress;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeFloat(progress);
+ if (tezCounters != null) {
+ out.writeBoolean(true);
+ tezCounters.write(out);
+ } else {
+ out.writeBoolean(false);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ progress = in.readFloat();
+ if (in.readBoolean()) {
+ tezCounters = new TezCounters();
+ tezCounters.readFields(in);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java
new file mode 100644
index 0000000..d650fa3
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+/**
+ * Class that encapsulates all the information to identify the unique
+ * object that either generated an Event or is the recipient of an Event.
+ */
+public class EventMetaData implements Writable {
+
+ public static enum EventProducerConsumerType {
+ INPUT,
+ PROCESSOR,
+ OUTPUT,
+ SYSTEM
+ }
+
+ /**
+ * Producer Type ( one of Input/Output/Processor ) that generated the Event
+ * or Consumer Type that will consume the Event.
+ */
+ private EventProducerConsumerType producerConsumerType;
+
+ /**
+ * Name of the vertex where the event was generated.
+ */
+ private String taskVertexName;
+
+ /**
+ * Name of the vertex to which the Input or Output is connected to.
+ */
+ private String edgeVertexName;
+
+ /**
+ * i'th physical input/output that this event maps to.
+ */
+ private int index;
+
+ /**
+ * Task Attempt ID
+ */
+ private TezTaskAttemptID taskAttemptID;
+
+ public EventMetaData() {
+ }
+
+ public EventMetaData(EventProducerConsumerType generator,
+ String taskVertexName, String edgeVertexName,
+ TezTaskAttemptID taskAttemptID) {
+ this.producerConsumerType = generator;
+ this.taskVertexName = taskVertexName;
+ this.edgeVertexName = edgeVertexName;
+ this.taskAttemptID = taskAttemptID;
+ }
+
+ public EventProducerConsumerType getEventGenerator() {
+ return producerConsumerType;
+ }
+
+ public TezTaskAttemptID getTaskAttemptID() {
+ return taskAttemptID;
+ }
+
+ public String getTaskVertexName() {
+ return taskVertexName;
+ }
+
+ public String getEdgeVertexName() {
+ return edgeVertexName;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(producerConsumerType.ordinal());
+ if (taskVertexName != null) {
+ out.writeBoolean(true);
+ out.writeUTF(taskVertexName);
+ } else {
+ out.writeBoolean(false);
+ }
+ if (edgeVertexName != null) {
+ out.writeBoolean(true);
+ out.writeUTF(edgeVertexName);
+ } else {
+ out.writeBoolean(false);
+ }
+ if(taskAttemptID != null) {
+ out.writeBoolean(true);
+ taskAttemptID.write(out);
+ } else {
+ out.writeBoolean(false);
+ }
+
+ out.writeInt(index);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ producerConsumerType = EventProducerConsumerType.values()[in.readInt()];
+ if (in.readBoolean()) {
+ taskVertexName = in.readUTF();
+ }
+ if (in.readBoolean()) {
+ edgeVertexName = in.readUTF();
+ }
+ if (in.readBoolean()) {
+ taskAttemptID = new TezTaskAttemptID();
+ taskAttemptID.readFields(in);
+ }
+ index = in.readInt();
+ }
+
+ public int getIndex() {
+ return index;
+ }
+
+ public void setIndex(int index) {
+ this.index = index;
+ }
+
+ @Override
+ public String toString() {
+ return "{ producerConsumerType=" + producerConsumerType
+ + ", taskVertexName=" + taskVertexName
+ + ", edgeVertexName=" + edgeVertexName
+ + ", taskAttemptId=" + taskAttemptID
+ + ", index=" + index + " }";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
new file mode 100644
index 0000000..81ff5fc
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+public enum EventType {
+ TASK_ATTEMPT_COMPLETED_EVENT,
+ TASK_ATTEMPT_FAILED_EVENT,
+ DATA_MOVEMENT_EVENT,
+ INPUT_READ_ERROR_EVENT,
+ INPUT_FAILED_EVENT,
+ INTPUT_INFORMATION_EVENT,
+ TASK_STATUS_UPDATE_EVENT
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/InputSpec.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/InputSpec.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/InputSpec.java
new file mode 100644
index 0000000..78ed886
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/InputSpec.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
+
+public class InputSpec implements Writable {
+
+ private String sourceVertexName;
+ private InputDescriptor inputDescriptor;
+ private int physicalEdgeCount;
+
+ public InputSpec() {
+ }
+
+ public InputSpec(String sourceVertexName, InputDescriptor inputDescriptor,
+ int physicalEdgeCount) {
+ this.sourceVertexName = sourceVertexName;
+ this.inputDescriptor = inputDescriptor;
+ this.physicalEdgeCount = physicalEdgeCount;
+ }
+
+ public String getSourceVertexName() {
+ return sourceVertexName;
+ }
+
+ public InputDescriptor getInputDescriptor() {
+ return inputDescriptor;
+ }
+
+ public int getPhysicalEdgeCount() {
+ return physicalEdgeCount;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ // TODONEWTEZ convert to PB
+ out.writeUTF(sourceVertexName);
+ out.writeInt(physicalEdgeCount);
+ byte[] inputDescBytes =
+ DagTypeConverters.convertToDAGPlan(inputDescriptor).toByteArray();
+ out.writeInt(inputDescBytes.length);
+ out.write(inputDescBytes);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ sourceVertexName = in.readUTF();
+ physicalEdgeCount = in.readInt();
+ int inputDescLen = in.readInt();
+ byte[] inputDescBytes = new byte[inputDescLen];
+ in.readFully(inputDescBytes);
+ inputDescriptor =
+ DagTypeConverters.convertInputDescriptorFromDAGPlan(
+ TezEntityDescriptorProto.parseFrom(inputDescBytes));
+ }
+
+ public String toString() {
+ return "{ sourceVertexName=" + sourceVertexName
+ + ", physicalEdgeCount" + physicalEdgeCount
+ + ", inputClassName=" + inputDescriptor.getClassName()
+ + " }";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/OutputSpec.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/OutputSpec.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/OutputSpec.java
new file mode 100644
index 0000000..4034cdd
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/OutputSpec.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
+
+public class OutputSpec implements Writable {
+
+ private String destinationVertexName;
+ private OutputDescriptor outputDescriptor;
+ private int physicalEdgeCount;
+
+ public OutputSpec() {
+ }
+
+ public OutputSpec(String destinationVertexName,
+ OutputDescriptor outputDescriptor, int physicalEdgeCount) {
+ this.destinationVertexName = destinationVertexName;
+ this.outputDescriptor = outputDescriptor;
+ this.physicalEdgeCount = physicalEdgeCount;
+ }
+
+ public String getDestinationVertexName() {
+ return destinationVertexName;
+ }
+
+ public OutputDescriptor getOutputDescriptor() {
+ return outputDescriptor;
+ }
+
+ public int getPhysicalEdgeCount() {
+ return physicalEdgeCount;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ // TODONEWTEZ convert to PB
+ out.writeUTF(destinationVertexName);
+ out.writeInt(physicalEdgeCount);
+ byte[] inputDescBytes =
+ DagTypeConverters.convertToDAGPlan(outputDescriptor).toByteArray();
+ out.writeInt(inputDescBytes.length);
+ out.write(inputDescBytes);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ destinationVertexName = in.readUTF();
+ physicalEdgeCount = in.readInt();
+ int inputDescLen = in.readInt();
+ byte[] inputDescBytes = new byte[inputDescLen];
+ in.readFully(inputDescBytes);
+ outputDescriptor =
+ DagTypeConverters.convertOutputDescriptorFromDAGPlan(
+ TezEntityDescriptorProto.parseFrom(inputDescBytes));
+ }
+
+ public String toString() {
+ return "{ destinationVertexName=" + destinationVertexName
+ + ", physicalEdgeCount" + physicalEdgeCount
+ + ", outputClassName=" + outputDescriptor.getClassName()
+ + " }";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
new file mode 100644
index 0000000..6e0995a
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+public class TaskSpec implements Writable {
+
+ private TezTaskAttemptID taskAttemptId;
+ private String vertexName;
+ private String user;
+ private ProcessorDescriptor processorDescriptor;
+ private List<InputSpec> inputSpecList;
+ private List<OutputSpec> outputSpecList;
+
+ public TaskSpec() {
+ }
+
+ // TODO NEWTEZ Remove user
+ public TaskSpec(TezTaskAttemptID taskAttemptID, String user,
+ String vertexName, ProcessorDescriptor processorDescriptor,
+ List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList) {
+ this.taskAttemptId = taskAttemptID;
+ this.vertexName = vertexName;
+ this.user = user;
+ this.processorDescriptor = processorDescriptor;
+ this.inputSpecList = inputSpecList;
+ this.outputSpecList = outputSpecList;
+ }
+
+ public String getVertexName() {
+ return vertexName;
+ }
+
+ public TezTaskAttemptID getTaskAttemptID() {
+ return taskAttemptId;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public ProcessorDescriptor getProcessorDescriptor() {
+ return processorDescriptor;
+ }
+
+ public List<InputSpec> getInputs() {
+ return inputSpecList;
+ }
+
+ public List<OutputSpec> getOutputs() {
+ return outputSpecList;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ taskAttemptId.write(out);
+ out.writeUTF(vertexName);
+ byte[] procDesc =
+ DagTypeConverters.convertToDAGPlan(processorDescriptor).toByteArray();
+ out.writeInt(procDesc.length);
+ out.write(procDesc);
+ out.writeInt(inputSpecList.size());
+ for (InputSpec inputSpec : inputSpecList) {
+ inputSpec.write(out);
+ }
+ out.writeInt(outputSpecList.size());
+ for (OutputSpec outputSpec : outputSpecList) {
+ outputSpec.write(out);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ taskAttemptId = new TezTaskAttemptID();
+ taskAttemptId.readFields(in);
+ vertexName = in.readUTF();
+ int procDescLength = in.readInt();
+ // TODO at least 3 buffer copies here. Need to convert this to full PB
+ // TEZ-305
+ byte[] procDescBytes = new byte[procDescLength];
+ in.readFully(procDescBytes);
+ processorDescriptor =
+ DagTypeConverters.convertProcessorDescriptorFromDAGPlan(
+ TezEntityDescriptorProto.parseFrom(procDescBytes));
+ int numInputSpecs = in.readInt();
+ inputSpecList = new ArrayList<InputSpec>(numInputSpecs);
+ for (int i = 0; i < numInputSpecs; i++) {
+ InputSpec inputSpec = new InputSpec();
+ inputSpec.readFields(in);
+ inputSpecList.add(inputSpec);
+ }
+ int numOutputSpecs = in.readInt();
+ outputSpecList = new ArrayList<OutputSpec>(numOutputSpecs);
+ for (int i = 0; i < numOutputSpecs; i++) {
+ OutputSpec outputSpec = new OutputSpec();
+ outputSpec.readFields(in);
+ outputSpecList.add(outputSpec);
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append("TaskAttemptID:" + taskAttemptId);
+ sb.append("processorName=" + processorDescriptor.getClassName()
+ + ", inputSpecListSize=" + inputSpecList.size()
+ + ", outputSpecListSize=" + outputSpecList.size());
+ sb.append(", inputSpecList=[");
+ for (InputSpec i : inputSpecList) {
+ sb.append("{" + i.toString() + "}, ");
+ }
+ sb.append("], outputSpecList=[");
+ for (OutputSpec i : outputSpecList) {
+ sb.append("{" + i.toString() + "}, ");
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
new file mode 100644
index 0000000..e195cf9
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.EventProtos.DataMovementEventProto;
+import org.apache.tez.runtime.api.events.EventProtos.InputFailedEventProto;
+import org.apache.tez.runtime.api.events.EventProtos.InputInformationEventProto;
+import org.apache.tez.runtime.api.events.EventProtos.InputReadErrorEventProto;
+import org.apache.tez.runtime.api.events.InputFailedEvent;
+import org.apache.tez.runtime.api.events.InputInformationEvent;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
+import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
+import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
+import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
+import org.apache.tez.runtime.internals.api.events.SystemEventProtos.TaskAttemptCompletedEventProto;
+import org.apache.tez.runtime.internals.api.events.SystemEventProtos.TaskAttemptFailedEventProto;
+
+import com.google.protobuf.ByteString;
+
+public class TezEvent implements Writable {
+
+ private EventType eventType;
+
+ private Event event;
+
+ private EventMetaData sourceInfo;
+
+ private EventMetaData destinationInfo;
+
+ public TezEvent() {
+ }
+
+ public TezEvent(Event event, EventMetaData sourceInfo) {
+ this.event = event;
+ this.setSourceInfo(sourceInfo);
+ if (event instanceof DataMovementEvent) {
+ eventType = EventType.DATA_MOVEMENT_EVENT;
+ } else if (event instanceof InputReadErrorEvent) {
+ eventType = EventType.INPUT_READ_ERROR_EVENT;
+ } else if (event instanceof TaskAttemptFailedEvent) {
+ eventType = EventType.TASK_ATTEMPT_FAILED_EVENT;
+ } else if (event instanceof TaskAttemptCompletedEvent) {
+ eventType = EventType.TASK_ATTEMPT_COMPLETED_EVENT;
+ } else if (event instanceof InputInformationEvent) {
+ eventType = EventType.INTPUT_INFORMATION_EVENT;
+ } else if (event instanceof InputFailedEvent) {
+ eventType = EventType.INPUT_FAILED_EVENT;
+ } else if (event instanceof TaskStatusUpdateEvent) {
+ eventType = EventType.TASK_STATUS_UPDATE_EVENT;
+ } else {
+ throw new TezUncheckedException("Unknown event, event="
+ + event.getClass().getName());
+ }
+ }
+
+ public Event getEvent() {
+ return event;
+ }
+
+ public EventMetaData getSourceInfo() {
+ return sourceInfo;
+ }
+
+ public void setSourceInfo(EventMetaData sourceInfo) {
+ this.sourceInfo = sourceInfo;
+ }
+
+ public EventMetaData getDestinationInfo() {
+ return destinationInfo;
+ }
+
+ public void setDestinationInfo(EventMetaData destinationInfo) {
+ this.destinationInfo = destinationInfo;
+ }
+
+ public EventType getEventType() {
+ return eventType;
+ }
+
+ private void serializeEvent(DataOutput out) throws IOException {
+ if (event == null) {
+ out.writeBoolean(false);
+ return;
+ }
+ out.writeBoolean(true);
+ out.writeInt(eventType.ordinal());
+ if (eventType.equals(EventType.TASK_STATUS_UPDATE_EVENT)) {
+ // TODO NEWTEZ convert to PB
+ TaskStatusUpdateEvent sEvt = (TaskStatusUpdateEvent) event;
+ sEvt.write(out);
+ } else {
+ byte[] eventBytes = null;
+ switch (eventType) {
+ case DATA_MOVEMENT_EVENT:
+ DataMovementEvent dmEvt = (DataMovementEvent) event;
+ eventBytes = DataMovementEventProto.newBuilder()
+ .setSourceIndex(dmEvt.getSourceIndex())
+ .setTargetIndex(dmEvt.getTargetIndex())
+ .setUserPayload(ByteString.copyFrom(dmEvt.getUserPayload()))
+ .build().toByteArray();
+ break;
+ case INPUT_READ_ERROR_EVENT:
+ InputReadErrorEvent ideEvt = (InputReadErrorEvent) event;
+ eventBytes = InputReadErrorEventProto.newBuilder()
+ .setIndex(ideEvt.getIndex())
+ .setDiagnostics(ideEvt.getDiagnostics())
+ .build().toByteArray();
+ break;
+ case TASK_ATTEMPT_FAILED_EVENT:
+ TaskAttemptFailedEvent tfEvt = (TaskAttemptFailedEvent) event;
+ eventBytes = TaskAttemptFailedEventProto.newBuilder()
+ .setDiagnostics(tfEvt.getDiagnostics())
+ .build().toByteArray();
+ break;
+ case TASK_ATTEMPT_COMPLETED_EVENT:
+ eventBytes = TaskAttemptCompletedEventProto.newBuilder()
+ .build().toByteArray();
+ break;
+ case INPUT_FAILED_EVENT:
+ InputFailedEvent ifEvt = (InputFailedEvent) event;
+ eventBytes = InputFailedEventProto.newBuilder()
+ .setSourceIndex(ifEvt.getSourceIndex())
+ .setTargetIndex(ifEvt.getTargetIndex())
+ .setVersion(ifEvt.getVersion()).build().toByteArray();
+ case INTPUT_INFORMATION_EVENT:
+ InputInformationEvent iEvt = (InputInformationEvent) event;
+ eventBytes = InputInformationEventProto.newBuilder()
+ .setUserPayload(ByteString.copyFrom(iEvt.getUserPayload()))
+ .build().toByteArray();
+ default:
+ throw new TezUncheckedException("Unknown TezEvent"
+ + ", type=" + eventType);
+ }
+ out.writeInt(eventBytes.length);
+ out.write(eventBytes);
+ }
+ }
+
+ private void deserializeEvent(DataInput in) throws IOException {
+ if (!in.readBoolean()) {
+ event = null;
+ return;
+ }
+ eventType = EventType.values()[in.readInt()];
+ if (eventType.equals(EventType.TASK_STATUS_UPDATE_EVENT)) {
+ // TODO NEWTEZ convert to PB
+ event = new TaskStatusUpdateEvent();
+ ((TaskStatusUpdateEvent)event).readFields(in);
+ } else {
+ int eventBytesLen = in.readInt();
+ byte[] eventBytes = new byte[eventBytesLen];
+ in.readFully(eventBytes);
+ switch (eventType) {
+ case DATA_MOVEMENT_EVENT:
+ DataMovementEventProto dmProto =
+ DataMovementEventProto.parseFrom(eventBytes);
+ event = new DataMovementEvent(dmProto.getSourceIndex(),
+ dmProto.getTargetIndex(),
+ dmProto.getUserPayload().toByteArray());
+ break;
+ case INPUT_READ_ERROR_EVENT:
+ InputReadErrorEventProto ideProto =
+ InputReadErrorEventProto.parseFrom(eventBytes);
+ event = new InputReadErrorEvent(ideProto.getDiagnostics(),
+ ideProto.getIndex(), ideProto.getVersion());
+ break;
+ case TASK_ATTEMPT_FAILED_EVENT:
+ TaskAttemptFailedEventProto tfProto =
+ TaskAttemptFailedEventProto.parseFrom(eventBytes);
+ event = new TaskAttemptFailedEvent(tfProto.getDiagnostics());
+ break;
+ case TASK_ATTEMPT_COMPLETED_EVENT:
+ event = new TaskAttemptCompletedEvent();
+ break;
+ case INPUT_FAILED_EVENT:
+ InputFailedEventProto ifProto =
+ InputFailedEventProto.parseFrom(eventBytes);
+ event = new InputFailedEvent(ifProto.getSourceIndex(),
+ ifProto.getTargetIndex(), ifProto.getVersion());
+ break;
+ case INTPUT_INFORMATION_EVENT:
+ InputInformationEventProto infoProto =
+ InputInformationEventProto.parseFrom(eventBytes);
+ event = new InputInformationEvent(
+ infoProto.getUserPayload().toByteArray());
+ break;
+ default:
+ throw new TezUncheckedException("Unknown TezEvent"
+ + ", type=" + eventType);
+ }
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ serializeEvent(out);
+ if (sourceInfo != null) {
+ out.writeBoolean(true);
+ sourceInfo.write(out);
+ } else {
+ out.writeBoolean(false);
+ }
+ if (destinationInfo != null) {
+ out.writeBoolean(true);
+ destinationInfo.write(out);
+ } else {
+ out.writeBoolean(false);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ deserializeEvent(in);
+ if (in.readBoolean()) {
+ sourceInfo = new EventMetaData();
+ sourceInfo.readFields(in);
+ }
+ if (in.readBoolean()) {
+ destinationInfo = new EventMetaData();
+ destinationInfo.readFields(in);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatRequest.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatRequest.java
new file mode 100644
index 0000000..af7cebb
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatRequest.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+
+public class TezHeartbeatRequest implements Writable {
+
+ private String containerIdentifier;
+ private List<TezEvent> events;
+ private TezTaskAttemptID currentTaskAttemptID;
+ private int startIndex;
+ private int maxEvents;
+ private long requestId;
+
+ public TezHeartbeatRequest() {
+ }
+
+ public TezHeartbeatRequest(long requestId, List<TezEvent> events,
+ String containerIdentifier, TezTaskAttemptID taskAttemptID,
+ int startIndex, int maxEvents) {
+ this.containerIdentifier = containerIdentifier;
+ this.requestId = requestId;
+ this.events = Collections.unmodifiableList(events);
+ this.startIndex = startIndex;
+ this.maxEvents = maxEvents;
+ this.currentTaskAttemptID = taskAttemptID;
+ }
+
+ public String getContainerIdentifier() {
+ return containerIdentifier;
+ }
+
+ public List<TezEvent> getEvents() {
+ return events;
+ }
+
+ public int getStartIndex() {
+ return startIndex;
+ }
+
+ public int getMaxEvents() {
+ return maxEvents;
+ }
+
+ public long getRequestId() {
+ return requestId;
+ }
+
+ public TezTaskAttemptID getCurrentTaskAttemptID() {
+ return currentTaskAttemptID;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ if (events != null) {
+ out.writeBoolean(true);
+ out.writeInt(events.size());
+ for (TezEvent e : events) {
+ e.write(out);
+ }
+ } else {
+ out.writeBoolean(false);
+ }
+ if (currentTaskAttemptID != null) {
+ out.writeBoolean(true);
+ currentTaskAttemptID.write(out);
+ } else {
+ out.writeBoolean(false);
+ }
+ out.writeInt(startIndex);
+ out.writeInt(maxEvents);
+ out.writeLong(requestId);
+ Text.writeString(out, containerIdentifier);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ if (in.readBoolean()) {
+ int eventsCount = in.readInt();
+ events = new ArrayList<TezEvent>(eventsCount);
+ for (int i = 0; i < eventsCount; ++i) {
+ TezEvent e = new TezEvent();
+ e.readFields(in);
+ events.add(e);
+ }
+ }
+ if (in.readBoolean()) {
+ currentTaskAttemptID = new TezTaskAttemptID();
+ currentTaskAttemptID.readFields(in);
+ } else {
+ currentTaskAttemptID = null;
+ }
+ startIndex = in.readInt();
+ maxEvents = in.readInt();
+ requestId = in.readLong();
+ containerIdentifier = Text.readString(in);
+ }
+
+ @Override
+ public String toString() {
+ return "{ "
+ + " containerId=" + containerIdentifier
+ + ", requestId=" + requestId
+ + ", startIndex=" + startIndex
+ + ", maxEventsToGet=" + maxEvents
+ + ", taskAttemptId" + currentTaskAttemptID
+ + ", eventCount=" + (events != null ? events.size() : 0)
+ + " }";
+ }
+}