You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/23 19:45:22 UTC
[08/20] Rename *.new* packages back to what they should be,
remove dead code from the old packages - mapreduce module -
tez-engine module (part of TEZ-398). (sseth)
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldInMemorySortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldInMemorySortedOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldInMemorySortedOutput.java
deleted file mode 100644
index 9ac92ba..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldInMemorySortedOutput.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.lib.oldoutput;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.RunningTaskContext;
-import org.apache.tez.common.TezEngineTaskContext;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.engine.api.Output;
-import org.apache.tez.engine.common.sort.SortingOutput;
-import org.apache.tez.engine.records.OutputContext;
-
-/**
- * {@link OldInMemorySortedOutput} is an {@link Output} which sorts key/value pairs
- * written to it and persists it to a file.
- */
-public class OldInMemorySortedOutput implements SortingOutput {
-
- public OldInMemorySortedOutput(TezEngineTaskContext task) throws IOException {
- }
-
- public void initialize(Configuration conf, Master master)
- throws IOException, InterruptedException {
- }
-
- public void setTask(RunningTaskContext task) {
- }
-
- public void write(Object key, Object value) throws IOException,
- InterruptedException {
- }
-
- public void close() throws IOException, InterruptedException {
- }
-
- @Override
- public OutputContext getOutputContext() {
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldLocalOnFileSorterOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldLocalOnFileSorterOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldLocalOnFileSorterOutput.java
deleted file mode 100644
index b7f913c..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldLocalOnFileSorterOutput.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.lib.oldoutput;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tez.common.TezEngineTaskContext;
-
-public class OldLocalOnFileSorterOutput extends OldOnFileSortedOutput {
-
- private static final Log LOG = LogFactory.getLog(OldLocalOnFileSorterOutput.class);
-
- public OldLocalOnFileSorterOutput(TezEngineTaskContext task) throws IOException {
- super(task);
- }
-
- @Override
- public void close() throws IOException, InterruptedException {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldOnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldOnFileSortedOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldOnFileSortedOutput.java
deleted file mode 100644
index f259df9..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldOnFileSortedOutput.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.lib.oldoutput;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.RunningTaskContext;
-import org.apache.tez.common.TezEngineTaskContext;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.engine.api.Output;
-import org.apache.tez.engine.common.sort.SortingOutput;
-import org.apache.tez.engine.records.OutputContext;
-
-/**
- * {@link OldOnFileSortedOutput} is an {@link Output} which sorts key/value pairs
- * written to it and persists it to a file.
- */
-public class OldOnFileSortedOutput implements SortingOutput {
-
- public OldOnFileSortedOutput(TezEngineTaskContext task) throws IOException {
- }
-
- @Override
- public void initialize(Configuration conf, Master master)
- throws IOException, InterruptedException {
- }
-
- @Override
- public void setTask(RunningTaskContext task) {
- }
-
- @Override
- public void write(Object key, Object value) throws IOException,
- InterruptedException {
- }
-
- @Override
- public void close() throws IOException, InterruptedException {
- }
-
- @Override
- public OutputContext getOutputContext() {
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java
index 5d2a2ba..218aa21 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java
@@ -22,9 +22,9 @@ import java.util.Collections;
import java.util.List;
import org.apache.tez.common.TezUtils;
+import org.apache.tez.engine.api.KVWriter;
import org.apache.tez.engine.common.sort.impl.dflt.InMemoryShuffleSorter;
import org.apache.tez.engine.newapi.Event;
-import org.apache.tez.engine.newapi.KVWriter;
import org.apache.tez.engine.newapi.LogicalOutput;
import org.apache.tez.engine.newapi.Output;
import org.apache.tez.engine.newapi.TezOutputContext;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
index d23ac1e..963276d 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
@@ -25,7 +25,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutput;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
import org.apache.tez.engine.newapi.Event;
public class LocalOnFileSorterOutput extends OnFileSortedOutput {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
index ffb36c5..7e0ca37 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
@@ -26,11 +26,11 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.TezUtils;
+import org.apache.tez.engine.api.KVWriter;
import org.apache.tez.engine.common.shuffle.newimpl.ShuffleUserPayloads.DataMovementEventPayloadProto;
import org.apache.tez.engine.common.sort.impl.ExternalSorter;
import org.apache.tez.engine.common.sort.impl.dflt.DefaultSorter;
import org.apache.tez.engine.newapi.Event;
-import org.apache.tez.engine.newapi.KVWriter;
import org.apache.tez.engine.newapi.LogicalOutput;
import org.apache.tez.engine.newapi.TezOutputContext;
import org.apache.tez.engine.newapi.events.DataMovementEvent;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileUnorderedKVOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileUnorderedKVOutput.java
index ec193c5..37edde8 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileUnorderedKVOutput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileUnorderedKVOutput.java
@@ -24,10 +24,10 @@ import java.util.List;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.engine.api.KVWriter;
import org.apache.tez.engine.broadcast.output.FileBasedKVWriter;
import org.apache.tez.engine.common.shuffle.newimpl.ShuffleUserPayloads.DataMovementEventPayloadProto;
import org.apache.tez.engine.newapi.Event;
-import org.apache.tez.engine.newapi.KVWriter;
import org.apache.tez.engine.newapi.LogicalOutput;
import org.apache.tez.engine.newapi.TezOutputContext;
import org.apache.tez.engine.newapi.events.DataMovementEvent;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java
deleted file mode 100644
index 79615ce..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi;
-
-import java.io.IOException;
-
-/**
- * A key/value(s) pair based {@link Reader}.
- *
- * Example usage
- * <code>
- * while (kvReader.next()) {
- * KVRecord kvRecord = getCurrentKV();
- * Object key = kvRecord.getKey();
- * Iterable values = kvRecord.getValues();
- * </code>
- *
- */
-public interface KVReader extends Reader {
-
- /**
- * Moves to the next key/values(s) pair
- *
- * @return true if another key/value(s) pair exists, false if there are no more.
- * @throws IOException
- * if an error occurs
- */
- public boolean next() throws IOException;
-
- /**
- * Return the current key/value(s) pair. Use moveToNext() to advance.
- * @return
- * @throws IOException
- */
- public KVRecord getCurrentKV() throws IOException;
-
- // TODO NEWTEZ Move this to getCurrentKey and getCurrentValue independently. Otherwise usage pattern seems to be getCurrentKV.getKey, getCurrentKV.getValue
-
- // TODO NEWTEZ KVRecord which does not need to return a list!
- // TODO NEWTEZ Parameterize this
- /**
- * Represents a key and an associated set of values
- *
- */
- public static class KVRecord {
-
- private Object key;
- private Iterable<Object> values;
-
- public KVRecord(Object key, Iterable<Object> values) {
- this.key = key;
- this.values = values;
- }
-
- public Object getKey() {
- return this.key;
- }
-
- public Iterable<Object> getValues() {
- return this.values;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVWriter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVWriter.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVWriter.java
deleted file mode 100644
index ad48912..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVWriter.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi;
-
-import java.io.IOException;
-
-/**
- * A key/value(s) pair based {@link Writer}
- */
-public interface KVWriter extends Writer {
- /**
- * Writes a key/value pair.
- *
- * @param key
- * the key to write
- * @param value
- * the value to write
- * @throws IOException
- * if an error occurs
- */
- public void write(Object key, Object value) throws IOException;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskAttemptCompletedEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskAttemptCompletedEvent.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskAttemptCompletedEvent.java
deleted file mode 100644
index d3a582d..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskAttemptCompletedEvent.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.events;
-
-import org.apache.tez.engine.newapi.Event;
-
-public class TaskAttemptCompletedEvent extends Event {
-
- public TaskAttemptCompletedEvent() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskAttemptFailedEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskAttemptFailedEvent.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskAttemptFailedEvent.java
deleted file mode 100644
index 772d7fe..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskAttemptFailedEvent.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.events;
-
-import org.apache.tez.engine.newapi.Event;
-
-public class TaskAttemptFailedEvent extends Event {
-
- private final String diagnostics;
-
- public TaskAttemptFailedEvent(String diagnostics) {
- this.diagnostics = diagnostics;
- }
-
- public String getDiagnostics() {
- return diagnostics;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskStatusUpdateEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskStatusUpdateEvent.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskStatusUpdateEvent.java
deleted file mode 100644
index 0f09867..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskStatusUpdateEvent.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.events;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.engine.newapi.Event;
-
-public class TaskStatusUpdateEvent extends Event implements Writable {
-
- private TezCounters tezCounters;
- private float progress;
-
- public TaskStatusUpdateEvent() {
- }
-
- public TaskStatusUpdateEvent(TezCounters tezCounters, float progress) {
- this.tezCounters = tezCounters;
- this.progress = progress;
- }
-
- public TezCounters getCounters() {
- return tezCounters;
- }
-
- public float getProgress() {
- return progress;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeFloat(progress);
- if (tezCounters != null) {
- out.writeBoolean(true);
- tezCounters.write(out);
- } else {
- out.writeBoolean(false);
- }
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- progress = in.readFloat();
- if (in.readBoolean()) {
- tezCounters = new TezCounters();
- tezCounters.readFields(in);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
deleted file mode 100644
index 9faafc5..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.impl;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-/**
- * Class that encapsulates all the information to identify the unique
- * object that either generated an Event or is the recipient of an Event.
- */
-public class EventMetaData implements Writable {
-
- public static enum EventProducerConsumerType {
- INPUT,
- PROCESSOR,
- OUTPUT,
- SYSTEM
- }
-
- /**
- * Producer Type ( one of Input/Output/Processor ) that generated the Event
- * or Consumer Type that will consume the Event.
- */
- private EventProducerConsumerType producerConsumerType;
-
- /**
- * Name of the vertex where the event was generated.
- */
- private String taskVertexName;
-
- /**
- * Name of the vertex to which the Input or Output is connected to.
- */
- private String edgeVertexName;
-
- /**
- * i'th physical input/output that this event maps to.
- */
- private int index;
-
- /**
- * Task Attempt ID
- */
- private TezTaskAttemptID taskAttemptID;
-
- public EventMetaData() {
- }
-
- public EventMetaData(EventProducerConsumerType generator,
- String taskVertexName, String edgeVertexName,
- TezTaskAttemptID taskAttemptID) {
- this.producerConsumerType = generator;
- this.taskVertexName = taskVertexName;
- this.edgeVertexName = edgeVertexName;
- this.taskAttemptID = taskAttemptID;
- }
-
- public EventProducerConsumerType getEventGenerator() {
- return producerConsumerType;
- }
-
- public TezTaskAttemptID getTaskAttemptID() {
- return taskAttemptID;
- }
-
- public String getTaskVertexName() {
- return taskVertexName;
- }
-
- public String getEdgeVertexName() {
- return edgeVertexName;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeInt(producerConsumerType.ordinal());
- if (taskVertexName != null) {
- out.writeBoolean(true);
- out.writeUTF(taskVertexName);
- } else {
- out.writeBoolean(false);
- }
- if (edgeVertexName != null) {
- out.writeBoolean(true);
- out.writeUTF(edgeVertexName);
- } else {
- out.writeBoolean(false);
- }
- if(taskAttemptID != null) {
- out.writeBoolean(true);
- taskAttemptID.write(out);
- } else {
- out.writeBoolean(false);
- }
-
- out.writeInt(index);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- producerConsumerType = EventProducerConsumerType.values()[in.readInt()];
- if (in.readBoolean()) {
- taskVertexName = in.readUTF();
- }
- if (in.readBoolean()) {
- edgeVertexName = in.readUTF();
- }
- if (in.readBoolean()) {
- taskAttemptID = new TezTaskAttemptID();
- taskAttemptID.readFields(in);
- }
- index = in.readInt();
- }
-
- public int getIndex() {
- return index;
- }
-
- public void setIndex(int index) {
- this.index = index;
- }
-
- @Override
- public String toString() {
- return "{ producerConsumerType=" + producerConsumerType
- + ", taskVertexName=" + taskVertexName
- + ", edgeVertexName=" + edgeVertexName
- + ", taskAttemptId=" + taskAttemptID
- + ", index=" + index + " }";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventType.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventType.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventType.java
deleted file mode 100644
index 87d6665..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventType.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.impl;
-
-public enum EventType {
- TASK_ATTEMPT_COMPLETED_EVENT,
- TASK_ATTEMPT_FAILED_EVENT,
- DATA_MOVEMENT_EVENT,
- INPUT_READ_ERROR_EVENT,
- INPUT_FAILED_EVENT,
- INTPUT_INFORMATION_EVENT,
- TASK_STATUS_UPDATE_EVENT
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/InputSpec.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/InputSpec.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/InputSpec.java
deleted file mode 100644
index a2b8cc8..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/InputSpec.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.impl;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.tez.dag.api.DagTypeConverters;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
-
-public class InputSpec implements Writable {
-
- private String sourceVertexName;
- private InputDescriptor inputDescriptor;
- private int physicalEdgeCount;
-
- public InputSpec() {
- }
-
- public InputSpec(String sourceVertexName, InputDescriptor inputDescriptor,
- int physicalEdgeCount) {
- this.sourceVertexName = sourceVertexName;
- this.inputDescriptor = inputDescriptor;
- this.physicalEdgeCount = physicalEdgeCount;
- }
-
- public String getSourceVertexName() {
- return sourceVertexName;
- }
-
- public InputDescriptor getInputDescriptor() {
- return inputDescriptor;
- }
-
- public int getPhysicalEdgeCount() {
- return physicalEdgeCount;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- // TODONEWTEZ convert to PB
- out.writeUTF(sourceVertexName);
- out.writeInt(physicalEdgeCount);
- byte[] inputDescBytes =
- DagTypeConverters.convertToDAGPlan(inputDescriptor).toByteArray();
- out.writeInt(inputDescBytes.length);
- out.write(inputDescBytes);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- sourceVertexName = in.readUTF();
- physicalEdgeCount = in.readInt();
- int inputDescLen = in.readInt();
- byte[] inputDescBytes = new byte[inputDescLen];
- in.readFully(inputDescBytes);
- inputDescriptor =
- DagTypeConverters.convertInputDescriptorFromDAGPlan(
- TezEntityDescriptorProto.parseFrom(inputDescBytes));
- }
-
- public String toString() {
- return "{ sourceVertexName=" + sourceVertexName
- + ", physicalEdgeCount" + physicalEdgeCount
- + ", inputClassName=" + inputDescriptor.getClassName()
- + " }";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/OutputSpec.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/OutputSpec.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/OutputSpec.java
deleted file mode 100644
index 1b34ef0..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/OutputSpec.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.impl;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.tez.dag.api.DagTypeConverters;
-import org.apache.tez.dag.api.OutputDescriptor;
-import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
-
-public class OutputSpec implements Writable {
-
- private String destinationVertexName;
- private OutputDescriptor outputDescriptor;
- private int physicalEdgeCount;
-
- public OutputSpec() {
- }
-
- public OutputSpec(String destinationVertexName,
- OutputDescriptor inputDescriptor, int physicalEdgeCount) {
- this.destinationVertexName = destinationVertexName;
- this.outputDescriptor = inputDescriptor;
- this.physicalEdgeCount = physicalEdgeCount;
- }
-
- public String getDestinationVertexName() {
- return destinationVertexName;
- }
-
- public OutputDescriptor getOutputDescriptor() {
- return outputDescriptor;
- }
-
- public int getPhysicalEdgeCount() {
- return physicalEdgeCount;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- // TODONEWTEZ convert to PB
- out.writeUTF(destinationVertexName);
- out.writeInt(physicalEdgeCount);
- byte[] inputDescBytes =
- DagTypeConverters.convertToDAGPlan(outputDescriptor).toByteArray();
- out.writeInt(inputDescBytes.length);
- out.write(inputDescBytes);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- destinationVertexName = in.readUTF();
- physicalEdgeCount = in.readInt();
- int inputDescLen = in.readInt();
- byte[] inputDescBytes = new byte[inputDescLen];
- in.readFully(inputDescBytes);
- outputDescriptor =
- DagTypeConverters.convertOutputDescriptorFromDAGPlan(
- TezEntityDescriptorProto.parseFrom(inputDescBytes));
- }
-
- public String toString() {
- return "{ destinationVertexName=" + destinationVertexName
- + ", physicalEdgeCount" + physicalEdgeCount
- + ", outputClassName=" + outputDescriptor.getClassName()
- + " }";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TaskSpec.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TaskSpec.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TaskSpec.java
deleted file mode 100644
index 8290e30..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TaskSpec.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.tez.engine.newapi.impl;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.tez.dag.api.DagTypeConverters;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-public class TaskSpec implements Writable {
-
- private TezTaskAttemptID taskAttemptId;
- private String vertexName;
- private String user;
- private ProcessorDescriptor processorDescriptor;
- private List<InputSpec> inputSpecList;
- private List<OutputSpec> outputSpecList;
-
- public TaskSpec() {
- }
-
- // TODO NEWTEZ Remove user
- public TaskSpec(TezTaskAttemptID taskAttemptID, String user,
- String vertexName, ProcessorDescriptor processorDescriptor,
- List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList) {
- this.taskAttemptId = taskAttemptID;
- this.vertexName = vertexName;
- this.user = user;
- this.processorDescriptor = processorDescriptor;
- this.inputSpecList = inputSpecList;
- this.outputSpecList = outputSpecList;
- }
-
- public String getVertexName() {
- return vertexName;
- }
-
- public TezTaskAttemptID getTaskAttemptID() {
- return taskAttemptId;
- }
-
- public String getUser() {
- return user;
- }
-
- public ProcessorDescriptor getProcessorDescriptor() {
- return processorDescriptor;
- }
-
- public List<InputSpec> getInputs() {
- return inputSpecList;
- }
-
- public List<OutputSpec> getOutputs() {
- return outputSpecList;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- taskAttemptId.write(out);
- out.writeUTF(vertexName);
- byte[] procDesc =
- DagTypeConverters.convertToDAGPlan(processorDescriptor).toByteArray();
- out.writeInt(procDesc.length);
- out.write(procDesc);
- out.writeInt(inputSpecList.size());
- for (InputSpec inputSpec : inputSpecList) {
- inputSpec.write(out);
- }
- out.writeInt(outputSpecList.size());
- for (OutputSpec outputSpec : outputSpecList) {
- outputSpec.write(out);
- }
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- taskAttemptId = new TezTaskAttemptID();
- taskAttemptId.readFields(in);
- vertexName = in.readUTF();
- int procDescLength = in.readInt();
- // TODO at least 3 buffer copies here. Need to convert this to full PB
- // TEZ-305
- byte[] procDescBytes = new byte[procDescLength];
- in.readFully(procDescBytes);
- processorDescriptor =
- DagTypeConverters.convertProcessorDescriptorFromDAGPlan(
- TezEntityDescriptorProto.parseFrom(procDescBytes));
- int numInputSpecs = in.readInt();
- inputSpecList = new ArrayList<InputSpec>(numInputSpecs);
- for (int i = 0; i < numInputSpecs; i++) {
- InputSpec inputSpec = new InputSpec();
- inputSpec.readFields(in);
- inputSpecList.add(inputSpec);
- }
- int numOutputSpecs = in.readInt();
- outputSpecList = new ArrayList<OutputSpec>(numOutputSpecs);
- for (int i = 0; i < numOutputSpecs; i++) {
- OutputSpec outputSpec = new OutputSpec();
- outputSpec.readFields(in);
- outputSpecList.add(outputSpec);
- }
- }
-
- @Override
- public String toString() {
- StringBuffer sb = new StringBuffer();
- sb.append("TaskAttemptID:" + taskAttemptId);
- sb.append("processorName=" + processorDescriptor.getClassName()
- + ", inputSpecListSize=" + inputSpecList.size()
- + ", outputSpecListSize=" + outputSpecList.size());
- sb.append(", inputSpecList=[");
- for (InputSpec i : inputSpecList) {
- sb.append("{" + i.toString() + "}, ");
- }
- sb.append("], outputSpecList=[");
- for (OutputSpec i : outputSpecList) {
- sb.append("{" + i.toString() + "}, ");
- }
- sb.append("]");
- return sb.toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezEvent.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezEvent.java
deleted file mode 100644
index 0f65750..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezEvent.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.impl;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.engine.api.events.EventProtos.DataMovementEventProto;
-import org.apache.tez.engine.api.events.EventProtos.InputFailedEventProto;
-import org.apache.tez.engine.api.events.EventProtos.InputInformationEventProto;
-import org.apache.tez.engine.api.events.EventProtos.InputReadErrorEventProto;
-import org.apache.tez.engine.api.events.SystemEventProtos.TaskAttemptCompletedEventProto;
-import org.apache.tez.engine.api.events.SystemEventProtos.TaskAttemptFailedEventProto;
-import org.apache.tez.engine.newapi.Event;
-import org.apache.tez.engine.newapi.events.DataMovementEvent;
-import org.apache.tez.engine.newapi.events.InputFailedEvent;
-import org.apache.tez.engine.newapi.events.InputInformationEvent;
-import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
-import org.apache.tez.engine.newapi.events.TaskAttemptCompletedEvent;
-import org.apache.tez.engine.newapi.events.TaskAttemptFailedEvent;
-import org.apache.tez.engine.newapi.events.TaskStatusUpdateEvent;
-
-import com.google.protobuf.ByteString;
-
-public class TezEvent implements Writable {
-
- private EventType eventType;
-
- private Event event;
-
- private EventMetaData sourceInfo;
-
- private EventMetaData destinationInfo;
-
- public TezEvent() {
- }
-
- public TezEvent(Event event, EventMetaData sourceInfo) {
- this.event = event;
- this.setSourceInfo(sourceInfo);
- if (event instanceof DataMovementEvent) {
- eventType = EventType.DATA_MOVEMENT_EVENT;
- } else if (event instanceof InputReadErrorEvent) {
- eventType = EventType.INPUT_READ_ERROR_EVENT;
- } else if (event instanceof TaskAttemptFailedEvent) {
- eventType = EventType.TASK_ATTEMPT_FAILED_EVENT;
- } else if (event instanceof TaskAttemptCompletedEvent) {
- eventType = EventType.TASK_ATTEMPT_COMPLETED_EVENT;
- } else if (event instanceof InputInformationEvent) {
- eventType = EventType.INTPUT_INFORMATION_EVENT;
- } else if (event instanceof InputFailedEvent) {
- eventType = EventType.INPUT_FAILED_EVENT;
- } else if (event instanceof TaskStatusUpdateEvent) {
- eventType = EventType.TASK_STATUS_UPDATE_EVENT;
- } else {
- throw new TezUncheckedException("Unknown event, event="
- + event.getClass().getName());
- }
- }
-
- public Event getEvent() {
- return event;
- }
-
- public EventMetaData getSourceInfo() {
- return sourceInfo;
- }
-
- public void setSourceInfo(EventMetaData sourceInfo) {
- this.sourceInfo = sourceInfo;
- }
-
- public EventMetaData getDestinationInfo() {
- return destinationInfo;
- }
-
- public void setDestinationInfo(EventMetaData destinationInfo) {
- this.destinationInfo = destinationInfo;
- }
-
- public EventType getEventType() {
- return eventType;
- }
-
- private void serializeEvent(DataOutput out) throws IOException {
- if (event == null) {
- out.writeBoolean(false);
- return;
- }
- out.writeBoolean(true);
- out.writeInt(eventType.ordinal());
- if (eventType.equals(EventType.TASK_STATUS_UPDATE_EVENT)) {
- // TODO NEWTEZ convert to PB
- TaskStatusUpdateEvent sEvt = (TaskStatusUpdateEvent) event;
- sEvt.write(out);
- } else {
- byte[] eventBytes = null;
- switch (eventType) {
- case DATA_MOVEMENT_EVENT:
- DataMovementEvent dmEvt = (DataMovementEvent) event;
- eventBytes = DataMovementEventProto.newBuilder()
- .setSourceIndex(dmEvt.getSourceIndex())
- .setTargetIndex(dmEvt.getTargetIndex())
- .setUserPayload(ByteString.copyFrom(dmEvt.getUserPayload()))
- .build().toByteArray();
- break;
- case INPUT_READ_ERROR_EVENT:
- InputReadErrorEvent ideEvt = (InputReadErrorEvent) event;
- eventBytes = InputReadErrorEventProto.newBuilder()
- .setIndex(ideEvt.getIndex())
- .setDiagnostics(ideEvt.getDiagnostics())
- .build().toByteArray();
- break;
- case TASK_ATTEMPT_FAILED_EVENT:
- TaskAttemptFailedEvent tfEvt = (TaskAttemptFailedEvent) event;
- eventBytes = TaskAttemptFailedEventProto.newBuilder()
- .setDiagnostics(tfEvt.getDiagnostics())
- .build().toByteArray();
- break;
- case TASK_ATTEMPT_COMPLETED_EVENT:
- eventBytes = TaskAttemptCompletedEventProto.newBuilder()
- .build().toByteArray();
- break;
- case INPUT_FAILED_EVENT:
- InputFailedEvent ifEvt = (InputFailedEvent) event;
- eventBytes = InputFailedEventProto.newBuilder()
- .setSourceIndex(ifEvt.getSourceIndex())
- .setTargetIndex(ifEvt.getTargetIndex())
- .setVersion(ifEvt.getVersion()).build().toByteArray();
- case INTPUT_INFORMATION_EVENT:
- InputInformationEvent iEvt = (InputInformationEvent) event;
- eventBytes = InputInformationEventProto.newBuilder()
- .setUserPayload(ByteString.copyFrom(iEvt.getUserPayload()))
- .build().toByteArray();
- default:
- throw new TezUncheckedException("Unknown TezEvent"
- + ", type=" + eventType);
- }
- out.writeInt(eventBytes.length);
- out.write(eventBytes);
- }
- }
-
- private void deserializeEvent(DataInput in) throws IOException {
- if (!in.readBoolean()) {
- event = null;
- return;
- }
- eventType = EventType.values()[in.readInt()];
- if (eventType.equals(EventType.TASK_STATUS_UPDATE_EVENT)) {
- // TODO NEWTEZ convert to PB
- event = new TaskStatusUpdateEvent();
- ((TaskStatusUpdateEvent)event).readFields(in);
- } else {
- int eventBytesLen = in.readInt();
- byte[] eventBytes = new byte[eventBytesLen];
- in.readFully(eventBytes);
- switch (eventType) {
- case DATA_MOVEMENT_EVENT:
- DataMovementEventProto dmProto =
- DataMovementEventProto.parseFrom(eventBytes);
- event = new DataMovementEvent(dmProto.getSourceIndex(),
- dmProto.getTargetIndex(),
- dmProto.getUserPayload().toByteArray());
- break;
- case INPUT_READ_ERROR_EVENT:
- InputReadErrorEventProto ideProto =
- InputReadErrorEventProto.parseFrom(eventBytes);
- event = new InputReadErrorEvent(ideProto.getDiagnostics(),
- ideProto.getIndex(), ideProto.getVersion());
- break;
- case TASK_ATTEMPT_FAILED_EVENT:
- TaskAttemptFailedEventProto tfProto =
- TaskAttemptFailedEventProto.parseFrom(eventBytes);
- event = new TaskAttemptFailedEvent(tfProto.getDiagnostics());
- break;
- case TASK_ATTEMPT_COMPLETED_EVENT:
- event = new TaskAttemptCompletedEvent();
- break;
- case INPUT_FAILED_EVENT:
- InputFailedEventProto ifProto =
- InputFailedEventProto.parseFrom(eventBytes);
- event = new InputFailedEvent(ifProto.getSourceIndex(),
- ifProto.getTargetIndex(), ifProto.getVersion());
- break;
- case INTPUT_INFORMATION_EVENT:
- InputInformationEventProto infoProto =
- InputInformationEventProto.parseFrom(eventBytes);
- event = new InputInformationEvent(
- infoProto.getUserPayload().toByteArray());
- break;
- default:
- throw new TezUncheckedException("Unknown TezEvent"
- + ", type=" + eventType);
- }
- }
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- serializeEvent(out);
- if (sourceInfo != null) {
- out.writeBoolean(true);
- sourceInfo.write(out);
- } else {
- out.writeBoolean(false);
- }
- if (destinationInfo != null) {
- out.writeBoolean(true);
- destinationInfo.write(out);
- } else {
- out.writeBoolean(false);
- }
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- deserializeEvent(in);
- if (in.readBoolean()) {
- sourceInfo = new EventMetaData();
- sourceInfo.readFields(in);
- }
- if (in.readBoolean()) {
- destinationInfo = new EventMetaData();
- destinationInfo.readFields(in);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java
deleted file mode 100644
index 79a0968..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.impl;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-
-public class TezHeartbeatRequest implements Writable {
-
- private String containerIdentifier;
- private List<TezEvent> events;
- private TezTaskAttemptID currentTaskAttemptID;
- private int startIndex;
- private int maxEvents;
- private long requestId;
-
- public TezHeartbeatRequest() {
- }
-
- public TezHeartbeatRequest(long requestId, List<TezEvent> events,
- String containerIdentifier, TezTaskAttemptID taskAttemptID,
- int startIndex, int maxEvents) {
- this.containerIdentifier = containerIdentifier;
- this.requestId = requestId;
- this.events = Collections.unmodifiableList(events);
- this.startIndex = startIndex;
- this.maxEvents = maxEvents;
- this.currentTaskAttemptID = taskAttemptID;
- }
-
- public String getContainerIdentifier() {
- return containerIdentifier;
- }
-
- public List<TezEvent> getEvents() {
- return events;
- }
-
- public int getStartIndex() {
- return startIndex;
- }
-
- public int getMaxEvents() {
- return maxEvents;
- }
-
- public long getRequestId() {
- return requestId;
- }
-
- public TezTaskAttemptID getCurrentTaskAttemptID() {
- return currentTaskAttemptID;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- if (events != null) {
- out.writeBoolean(true);
- out.writeInt(events.size());
- for (TezEvent e : events) {
- e.write(out);
- }
- } else {
- out.writeBoolean(false);
- }
- if (currentTaskAttemptID != null) {
- out.writeBoolean(true);
- currentTaskAttemptID.write(out);
- } else {
- out.writeBoolean(false);
- }
- out.writeInt(startIndex);
- out.writeInt(maxEvents);
- out.writeLong(requestId);
- Text.writeString(out, containerIdentifier);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- if (in.readBoolean()) {
- int eventsCount = in.readInt();
- events = new ArrayList<TezEvent>(eventsCount);
- for (int i = 0; i < eventsCount; ++i) {
- TezEvent e = new TezEvent();
- e.readFields(in);
- events.add(e);
- }
- }
- if (in.readBoolean()) {
- currentTaskAttemptID = new TezTaskAttemptID();
- currentTaskAttemptID.readFields(in);
- } else {
- currentTaskAttemptID = null;
- }
- startIndex = in.readInt();
- maxEvents = in.readInt();
- requestId = in.readLong();
- containerIdentifier = Text.readString(in);
- }
-
- @Override
- public String toString() {
- return "{ "
- + " containerId=" + containerIdentifier
- + ", requestId=" + requestId
- + ", startIndex=" + startIndex
- + ", maxEventsToGet=" + maxEvents
- + ", taskAttemptId" + currentTaskAttemptID
- + ", eventCount=" + (events != null ? events.size() : 0)
- + " }";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java
deleted file mode 100644
index addd17f..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.impl;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.hadoop.io.Writable;
-
-public class TezHeartbeatResponse implements Writable {
-
- private long lastRequestId;
- private boolean shouldDie = false;
- private List<TezEvent> events;
-
- public TezHeartbeatResponse() {
- }
-
- public TezHeartbeatResponse(List<TezEvent> events) {
- this.events = Collections.unmodifiableList(events);
- }
-
- public List<TezEvent> getEvents() {
- return events;
- }
-
- public boolean shouldDie() {
- return shouldDie;
- }
-
- public long getLastRequestId() {
- return lastRequestId;
- }
-
- public void setEvents(List<TezEvent> events) {
- this.events = Collections.unmodifiableList(events);
- }
-
- public void setLastRequestId(long lastRequestId ) {
- this.lastRequestId = lastRequestId;
- }
-
- public void setShouldDie() {
- this.shouldDie = true;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeLong(lastRequestId);
- out.writeBoolean(shouldDie);
- if(events != null) {
- out.writeBoolean(true);
- out.writeInt(events.size());
- for (TezEvent e : events) {
- e.write(out);
- }
- } else {
- out.writeBoolean(false);
- }
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- lastRequestId = in.readLong();
- shouldDie = in.readBoolean();
- if(in.readBoolean()) {
- int eventCount = in.readInt();
- events = new ArrayList<TezEvent>(eventCount);
- for (int i = 0; i < eventCount; ++i) {
- TezEvent e = new TezEvent();
- e.readFields(in);
- events.add(e);
- }
- }
- }
-
- @Override
- public String toString() {
- return "{ "
- + " lastRequestId=" + lastRequestId
- + ", shouldDie=" + shouldDie
- + ", eventCount=" + (events != null ? events.size() : 0)
- + " }";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
deleted file mode 100644
index daafc5a..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.impl;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.newapi.Event;
-import org.apache.tez.engine.newapi.TezInputContext;
-import org.apache.tez.engine.newapi.impl.EventMetaData.EventProducerConsumerType;
-import org.apache.tez.engine.newruntime.RuntimeTask;
-
-public class TezInputContextImpl extends TezTaskContextImpl
- implements TezInputContext {
-
- private final byte[] userPayload;
- private final String sourceVertexName;
- private final EventMetaData sourceInfo;
-
- @Private
- public TezInputContextImpl(Configuration conf, int appAttemptNumber,
- TezUmbilical tezUmbilical, String taskVertexName,
- String sourceVertexName, TezTaskAttemptID taskAttemptID,
- TezCounters counters, byte[] userPayload,
- RuntimeTask runtimeTask, Map<String, ByteBuffer> serviceConsumerMetadata) {
- super(conf, appAttemptNumber, taskVertexName, taskAttemptID,
- counters, runtimeTask, tezUmbilical, serviceConsumerMetadata);
- this.userPayload = userPayload;
- this.sourceVertexName = sourceVertexName;
- this.sourceInfo = new EventMetaData(
- EventProducerConsumerType.INPUT, taskVertexName, sourceVertexName,
- taskAttemptID);
- this.uniqueIdentifier = String.format("%s_%s_%06d_%02d_%s", taskAttemptID
- .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
- getTaskIndex(), getTaskAttemptNumber(), sourceVertexName);
- }
-
- @Override
- public void sendEvents(List<Event> events) {
- List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
- for (Event e : events) {
- TezEvent tEvt = new TezEvent(e, sourceInfo);
- tezEvents.add(tEvt);
- }
- tezUmbilical.addEvents(tezEvents);
- }
-
- @Override
- public byte[] getUserPayload() {
- return userPayload;
- }
-
- @Override
- public String getSourceVertexName() {
- return sourceVertexName;
- }
-
- @Override
- public void fatalError(Throwable exception, String message) {
- super.signalFatalError(exception, message, sourceInfo);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
deleted file mode 100644
index 9de41ae..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.impl;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.newapi.Event;
-import org.apache.tez.engine.newapi.TezOutputContext;
-import org.apache.tez.engine.newapi.impl.EventMetaData.EventProducerConsumerType;
-import org.apache.tez.engine.newruntime.RuntimeTask;
-
-public class TezOutputContextImpl extends TezTaskContextImpl
- implements TezOutputContext {
-
- private final byte[] userPayload;
- private final String destinationVertexName;
- private final EventMetaData sourceInfo;
-
- @Private
- public TezOutputContextImpl(Configuration conf, int appAttemptNumber,
- TezUmbilical tezUmbilical, String taskVertexName,
- String destinationVertexName,
- TezTaskAttemptID taskAttemptID, TezCounters counters,
- byte[] userPayload, RuntimeTask runtimeTask,
- Map<String, ByteBuffer> serviceConsumerMetadata) {
- super(conf, appAttemptNumber, taskVertexName, taskAttemptID,
- counters, runtimeTask, tezUmbilical, serviceConsumerMetadata);
- this.userPayload = userPayload;
- this.destinationVertexName = destinationVertexName;
- this.sourceInfo = new EventMetaData(EventProducerConsumerType.OUTPUT,
- taskVertexName, destinationVertexName, taskAttemptID);
- this.uniqueIdentifier = String.format("%s_%s_%06d_%02d_%s", taskAttemptID
- .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
- getTaskIndex(), getTaskAttemptNumber(), destinationVertexName);
- }
-
- @Override
- public void sendEvents(List<Event> events) {
- List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
- for (Event e : events) {
- TezEvent tEvt = new TezEvent(e, sourceInfo);
- tezEvents.add(tEvt);
- }
- tezUmbilical.addEvents(tezEvents);
- }
-
- @Override
- public byte[] getUserPayload() {
- return userPayload;
- }
-
- @Override
- public String getDestinationVertexName() {
- return destinationVertexName;
- }
-
- @Override
- public void fatalError(Throwable exception, String message) {
- super.signalFatalError(exception, message, sourceInfo);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
deleted file mode 100644
index d710f7a..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.impl;
-
-import java.nio.ByteBuffer;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.newapi.Event;
-import org.apache.tez.engine.newapi.TezProcessorContext;
-import org.apache.tez.engine.newapi.impl.EventMetaData.EventProducerConsumerType;
-import org.apache.tez.engine.newruntime.RuntimeTask;
-
-public class TezProcessorContextImpl extends TezTaskContextImpl
- implements TezProcessorContext {
-
- private final byte[] userPayload;
- private final EventMetaData sourceInfo;
-
- public TezProcessorContextImpl(Configuration conf, int appAttemptNumber,
- TezUmbilical tezUmbilical, String vertexName,
- TezTaskAttemptID taskAttemptID, TezCounters counters,
- byte[] userPayload, RuntimeTask runtimeTask,
- Map<String, ByteBuffer> serviceConsumerMetadata) {
- super(conf, appAttemptNumber, vertexName, taskAttemptID,
- counters, runtimeTask, tezUmbilical, serviceConsumerMetadata);
- this.userPayload = userPayload;
- this.sourceInfo = new EventMetaData(EventProducerConsumerType.PROCESSOR,
- taskVertexName, "", taskAttemptID);
- this.uniqueIdentifier = String.format("%s_%s_%06d_%02d", taskAttemptID
- .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
- getTaskIndex(), getTaskAttemptNumber());
- }
-
- @Override
- public void sendEvents(List<Event> events) {
- List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
- for (Event e : events) {
- TezEvent tEvt = new TezEvent(e, sourceInfo);
- tezEvents.add(tEvt);
- }
- tezUmbilical.addEvents(tezEvents);
- }
-
- @Override
- public byte[] getUserPayload() {
- return userPayload;
- }
-
- @Override
- public void setProgress(float progress) {
- runtimeTask.setProgress(progress);
- }
-
- @Override
- public void fatalError(Throwable exception, String message) {
- super.signalFatalError(exception, message, sourceInfo);
- }
-
- @Override
- public boolean canCommit() throws IOException {
- return tezUmbilical.canCommit(this.taskAttemptID);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
deleted file mode 100644
index 1d17158..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.impl;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Map;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.newapi.TezTaskContext;
-import org.apache.tez.engine.newruntime.RuntimeTask;
-
-public abstract class TezTaskContextImpl implements TezTaskContext {
-
- private final Configuration conf;
- protected final String taskVertexName;
- protected final TezTaskAttemptID taskAttemptID;
- private final TezCounters counters;
- private String[] workDirs;
- protected String uniqueIdentifier;
- protected final RuntimeTask runtimeTask;
- protected final TezUmbilical tezUmbilical;
- private final Map<String, ByteBuffer> serviceConsumerMetadata;
- private final int appAttemptNumber;
-
- @Private
- public TezTaskContextImpl(Configuration conf, int appAttemptNumber,
- String taskVertexName, TezTaskAttemptID taskAttemptID,
- TezCounters counters, RuntimeTask runtimeTask,
- TezUmbilical tezUmbilical, Map<String, ByteBuffer> serviceConsumerMetadata) {
- this.conf = conf;
- this.taskVertexName = taskVertexName;
- this.taskAttemptID = taskAttemptID;
- this.counters = counters;
- // TODO Maybe change this to be task id specific at some point. For now
- // Shuffle code relies on this being a path specified by YARN
- this.workDirs = this.conf.getStrings(TezJobConfig.LOCAL_DIRS);
- this.runtimeTask = runtimeTask;
- this.tezUmbilical = tezUmbilical;
- this.serviceConsumerMetadata = serviceConsumerMetadata;
- // TODO NEWTEZ at some point dag attempt should not map to app attempt
- this.appAttemptNumber = appAttemptNumber;
- }
-
- @Override
- public ApplicationId getApplicationId() {
- return taskAttemptID.getTaskID().getVertexID().getDAGId()
- .getApplicationId();
- }
-
- @Override
- public int getTaskIndex() {
- return taskAttemptID.getTaskID().getId();
- }
-
- @Override
- public int getDAGAttemptNumber() {
- return appAttemptNumber;
- }
-
- @Override
- public int getTaskAttemptNumber() {
- return taskAttemptID.getId();
- }
-
- @Override
- public String getDAGName() {
- // TODO NEWTEZ Change to some form of the DAG name, for now using dagId as
- // the unique identifier.
- return taskAttemptID.getTaskID().getVertexID().getDAGId().toString();
- }
-
- @Override
- public String getTaskVertexName() {
- return taskVertexName;
- }
-
-
- @Override
- public TezCounters getCounters() {
- return counters;
- }
-
- @Override
- public String[] getWorkDirs() {
- return Arrays.copyOf(workDirs, workDirs.length);
- }
-
- @Override
- public String getUniqueIdentifier() {
- return uniqueIdentifier;
- }
-
- @Override
- public ByteBuffer getServiceConsumerMetaData(String serviceName) {
- return (ByteBuffer) serviceConsumerMetadata.get(serviceName)
- .asReadOnlyBuffer().rewind();
- }
-
- @Override
- public ByteBuffer getServiceProviderMetaData(String serviceName) {
- return AuxiliaryServiceHelper.getServiceDataFromEnv(
- serviceName, System.getenv());
- }
-
- protected void signalFatalError(Throwable t, String message,
- EventMetaData sourceInfo) {
- runtimeTask.setFatalError(t, message);
- String diagnostics;
- if (t != null && message != null) {
- diagnostics = "exceptionThrown=" + StringUtils.stringifyException(t)
- + ", errorMessage=" + message;
- } else if (t == null && message == null) {
- diagnostics = "Unknown error";
- } else {
- diagnostics = t != null ?
- "exceptionThrown=" + StringUtils.stringifyException(t)
- : " errorMessage=" + message;
- }
- tezUmbilical.signalFatalError(taskAttemptID, diagnostics, sourceInfo);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
deleted file mode 100644
index 5889622..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.impl;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-public interface TezUmbilical {
-
- public void addEvents(Collection<TezEvent> events);
-
- public void signalFatalError(TezTaskAttemptID taskAttemptID,
- String diagnostics,
- EventMetaData sourceInfo);
-
- public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
index eb055b6..77299de 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
@@ -38,6 +38,16 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.engine.api.impl.EventMetaData;
+import org.apache.tez.engine.api.impl.InputSpec;
+import org.apache.tez.engine.api.impl.OutputSpec;
+import org.apache.tez.engine.api.impl.TaskSpec;
+import org.apache.tez.engine.api.impl.TezEvent;
+import org.apache.tez.engine.api.impl.TezInputContextImpl;
+import org.apache.tez.engine.api.impl.TezOutputContextImpl;
+import org.apache.tez.engine.api.impl.TezProcessorContextImpl;
+import org.apache.tez.engine.api.impl.TezUmbilical;
+import org.apache.tez.engine.api.impl.EventMetaData.EventProducerConsumerType;
import org.apache.tez.engine.common.security.JobTokenIdentifier;
import org.apache.tez.engine.newapi.Event;
import org.apache.tez.engine.newapi.Input;
@@ -49,16 +59,6 @@ import org.apache.tez.engine.newapi.Processor;
import org.apache.tez.engine.newapi.TezInputContext;
import org.apache.tez.engine.newapi.TezOutputContext;
import org.apache.tez.engine.newapi.TezProcessorContext;
-import org.apache.tez.engine.newapi.impl.EventMetaData;
-import org.apache.tez.engine.newapi.impl.EventMetaData.EventProducerConsumerType;
-import org.apache.tez.engine.newapi.impl.InputSpec;
-import org.apache.tez.engine.newapi.impl.OutputSpec;
-import org.apache.tez.engine.newapi.impl.TaskSpec;
-import org.apache.tez.engine.newapi.impl.TezEvent;
-import org.apache.tez.engine.newapi.impl.TezInputContextImpl;
-import org.apache.tez.engine.newapi.impl.TezOutputContextImpl;
-import org.apache.tez.engine.newapi.impl.TezProcessorContextImpl;
-import org.apache.tez.engine.newapi.impl.TezUmbilical;
import org.apache.tez.engine.shuffle.common.ShuffleUtils;
import com.google.common.base.Preconditions;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
index ee6cde8..22cbc7c 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
@@ -25,9 +25,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.newapi.impl.TaskSpec;
-import org.apache.tez.engine.newapi.impl.TezEvent;
-import org.apache.tez.engine.newapi.impl.TezUmbilical;
+import org.apache.tez.engine.api.impl.TaskSpec;
+import org.apache.tez.engine.api.impl.TezEvent;
+import org.apache.tez.engine.api.impl.TezUmbilical;
public abstract class RuntimeTask {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/runtime/RuntimeUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/runtime/RuntimeUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/runtime/RuntimeUtils.java
deleted file mode 100644
index c673d16..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/runtime/RuntimeUtils.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.runtime;
-
-import java.lang.reflect.Constructor;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tez.common.InputSpec;
-import org.apache.tez.common.OutputSpec;
-import org.apache.tez.common.TezEngineTaskContext;
-import org.apache.tez.engine.api.Input;
-import org.apache.tez.engine.api.Output;
-import org.apache.tez.engine.api.Processor;
-import org.apache.tez.engine.task.RuntimeTask;
-
-public class RuntimeUtils {
-
- private static final Log LOG = LogFactory.getLog(RuntimeUtils.class);
-
- private static final Class<?>[] CONTEXT_ARRAY =
- new Class[] { TezEngineTaskContext.class };
- private static final Class<?>[] CONTEXT_INT_ARRAY =
- new Class[] { TezEngineTaskContext.class, Integer.TYPE };
- private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
- new ConcurrentHashMap<Class<?>, Constructor<?>>();
-
- @SuppressWarnings("unchecked")
- public static <T> T getNewInstance(Class<T> theClass,
- TezEngineTaskContext context) {
- T result;
- try {
- Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
- if (meth == null) {
- meth = theClass.getDeclaredConstructor(CONTEXT_ARRAY);
- meth.setAccessible(true);
- CONSTRUCTOR_CACHE.put(theClass, meth);
- }
- result = meth.newInstance(context);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- return result;
- }
-
- @SuppressWarnings("unchecked")
- public static <T> T getNewInputInstance(Class<T> theClass,
- TezEngineTaskContext context, int index) {
- T result;
- try {
- Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
- if (meth == null) {
- meth = theClass.getDeclaredConstructor(CONTEXT_INT_ARRAY);
- meth.setAccessible(true);
- CONSTRUCTOR_CACHE.put(theClass, meth);
- }
- result = meth.newInstance(context, index);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- return result;
- }
-
- public static RuntimeTask createRuntimeTask(
- TezEngineTaskContext taskContext) {
- LOG.info("Creating a runtime task from TaskContext"
- + ", Processor: " + taskContext.getProcessorName()
- + ", InputCount=" + taskContext.getInputSpecList().size()
- + ", OutputCount=" + taskContext.getOutputSpecList().size());
-
- RuntimeTask t = null;
- try {
- Class<?> processorClazz =
- Class.forName(taskContext.getProcessorName());
-
- Processor processor = (Processor) getNewInstance(
- processorClazz, taskContext);
-
- Input[] inputs;
- Output[] outputs;
- if (taskContext.getInputSpecList().isEmpty()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Initializing task with 0 inputs");
- }
- inputs = new Input[0];
- } else {
- int iSpecCount = taskContext.getInputSpecList().size();
- inputs = new Input[iSpecCount];
- for (int i = 0; i < iSpecCount; ++i) {
- InputSpec inSpec = taskContext.getInputSpecList().get(i);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Using Input"
- + ", index=" + i
- + ", inputClass=" + inSpec.getInputClassName());
- }
- Class<?> inputClazz = Class.forName(inSpec.getInputClassName());
- Input input = (Input) getNewInputInstance(inputClazz, taskContext, i);
- inputs[i] = input;
- }
- }
- if (taskContext.getOutputSpecList().isEmpty()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Initializing task with 0 outputs");
- }
- outputs = new Output[0];
- } else {
- int oSpecCount = taskContext.getOutputSpecList().size();
- outputs = new Output[oSpecCount];
- for (int i = 0; i < oSpecCount; ++i) {
- OutputSpec outSpec = taskContext.getOutputSpecList().get(i);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Using Output"
- + ", index=" + i
- + ", output=" + outSpec.getOutputClassName());
- }
- Class<?> outputClazz = Class.forName(outSpec.getOutputClassName());
- Output output = (Output) getNewInstance(outputClazz, taskContext);
- outputs[i] = output;
- }
- }
- t = createRuntime(taskContext, processor, inputs, outputs);
- } catch (ClassNotFoundException e) {
- throw new RuntimeException("Unable to initialize RuntimeTask, context="
- + taskContext, e);
- }
- return t;
- }
-
- private static RuntimeTask createRuntime(TezEngineTaskContext taskContext,
- Processor processor, Input[] inputs, Output[] outputs) {
- try {
- // TODO Change this to use getNewInstance
- Class<?> runtimeClazz = Class.forName(taskContext.getRuntimeName());
- Constructor<?> ctor = runtimeClazz.getConstructor(
- TezEngineTaskContext.class, Processor.class, Input[].class,
- Output[].class);
- ctor.setAccessible(true);
- return (RuntimeTask) ctor.newInstance(taskContext, processor, inputs, outputs);
- } catch (ClassNotFoundException e) {
- throw new RuntimeException("Unable to load runtimeClass: "
- + taskContext.getRuntimeName(), e);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/DiskFetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/DiskFetchedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/DiskFetchedInput.java
index e9bfe36..531e460 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/DiskFetchedInput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/DiskFetchedInput.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.tez.engine.common.InputAttemptIdentifier;
-import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutputFiles;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
import com.google.common.base.Preconditions;