You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/11/25 15:02:26 UTC
[6/9] tez git commit: TEZ-2581. Umbrella for Tez Recovery Redesign
(zjffdu)
http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
index e17a4d4..95ff0cd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
@@ -64,7 +64,10 @@ public class HistoryEventHandler extends CompositeService {
addService(historyLoggingService);
if (recoveryEnabled) {
- recoveryService = new RecoveryService(context);
+ String recoveryServiceClass = conf.get(TezConfiguration.TEZ_AM_RECOVERY_SERVICE_CLASS,
+ TezConfiguration.TEZ_AM_RECOVERY_SERVICE_CLASS_DEFAULT);
+ recoveryService = ReflectionUtils.createClazzInstance(recoveryServiceClass,
+ new Class[]{AppContext.class}, new Object[] {context});
addService(recoveryService);
}
super.serviceInit(conf);
http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
index d791d9e..4e56e9f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
@@ -29,7 +29,7 @@ public enum HistoryEventType {
DAG_KILL_REQUEST,
VERTEX_INITIALIZED,
VERTEX_STARTED,
- VERTEX_PARALLELISM_UPDATED,
+ VERTEX_CONFIGURE_DONE,
VERTEX_FINISHED,
TASK_STARTED,
TASK_FINISHED,
@@ -37,7 +37,6 @@ public enum HistoryEventType {
TASK_ATTEMPT_FINISHED,
CONTAINER_LAUNCHED,
CONTAINER_STOPPED,
- VERTEX_DATA_MOVEMENT_EVENTS_GENERATED,
DAG_COMMIT_STARTED,
VERTEX_COMMIT_STARTED,
VERTEX_GROUP_COMMIT_STARTED,
http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/RecoveryConverters.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/RecoveryConverters.java b/tez-dag/src/main/java/org/apache/tez/dag/history/RecoveryConverters.java
new file mode 100644
index 0000000..bab713d
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/RecoveryConverters.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.history;
+
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+
+public class RecoveryConverters {
+
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
index 7d83db2..21b8719 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
@@ -34,10 +34,13 @@ import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.utils.TezEventUtils;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.recovery.records.RecoveryProtos.DataEventDependencyInfoProto;
import org.apache.tez.dag.recovery.records.RecoveryProtos.TaskAttemptFinishedProto;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.TezEventProto;
+import org.apache.tez.runtime.api.impl.TezEvent;
public class TaskAttemptFinishedEvent implements HistoryEvent {
@@ -55,6 +58,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
private TezCounters tezCounters;
private TaskAttemptTerminationCause error;
private List<DataEventDependencyInfo> dataEvents;
+ private List<TezEvent> taGeneratedEvents;
public TaskAttemptFinishedEvent(TezTaskAttemptID taId,
String vertexName,
@@ -63,7 +67,8 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
TaskAttemptState state,
TaskAttemptTerminationCause error,
String diagnostics, TezCounters counters,
- List<DataEventDependencyInfo> dataEvents,
+ List<DataEventDependencyInfo> dataEvents,
+ List<TezEvent> taGeneratedEvents,
long creationTime,
TezTaskAttemptID creationCausalTA,
long allocationTime) {
@@ -79,6 +84,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
this.tezCounters = counters;
this.error = error;
this.dataEvents = dataEvents;
+ this.taGeneratedEvents = taGeneratedEvents;
}
public TaskAttemptFinishedEvent() {
@@ -103,7 +109,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
return dataEvents;
}
- public TaskAttemptFinishedProto toProto() {
+ public TaskAttemptFinishedProto toProto() throws IOException {
TaskAttemptFinishedProto.Builder builder =
TaskAttemptFinishedProto.newBuilder();
builder.setTaskAttemptId(taskAttemptId.toString())
@@ -129,10 +135,15 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
builder.addDataEvents(DataEventDependencyInfo.toProto(info));
}
}
+ if (taGeneratedEvents != null && !taGeneratedEvents.isEmpty()) {
+ for (TezEvent event : taGeneratedEvents) {
+ builder.addTaGeneratedEvents(TezEventUtils.toProto(event));
+ }
+ }
return builder.build();
}
- public void fromProto(TaskAttemptFinishedProto proto) {
+ public void fromProto(TaskAttemptFinishedProto proto) throws IOException {
this.taskAttemptId = TezTaskAttemptID.fromString(proto.getTaskAttemptId());
this.state = TaskAttemptState.values()[proto.getState()];
this.creationTime = proto.getCreationTime();
@@ -158,6 +169,12 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
this.dataEvents.add(DataEventDependencyInfo.fromProto(protoEvent));
}
}
+ if (proto.getTaGeneratedEventsCount() > 0) {
+ this.taGeneratedEvents = Lists.newArrayListWithCapacity(proto.getTaGeneratedEventsCount());
+ for (TezEventProto eventProto : proto.getTaGeneratedEventsList()) {
+ this.taGeneratedEvents.add(TezEventUtils.fromProto(eventProto));
+ }
+ }
}
@Override
@@ -236,4 +253,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
return creationCausalTA;
}
+ public List<TezEvent> getTAGeneratedEvents() {
+ return taGeneratedEvents;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java
new file mode 100644
index 0000000..4ad1c63
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.history.events;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.EdgeManagerDescriptorProto;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.RootInputSpecUpdateProto;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexConfigurationDoneProto;
+import org.apache.tez.runtime.api.InputSpecUpdate;
+
+import com.google.common.collect.Maps;
+
+public class VertexConfigurationDoneEvent implements HistoryEvent {
+
+ private TezVertexID vertexID;
+ private long reconfigureDoneTime;
+ private int numTasks;
+ private VertexLocationHint vertexLocationHint;
+ private Map<String, EdgeProperty> sourceEdgeProperties;
+ private Map<String, InputSpecUpdate> rootInputSpecUpdates;
+ private boolean setParallelismCalledFlag;
+
+ public VertexConfigurationDoneEvent() {
+ }
+
+ public VertexConfigurationDoneEvent(TezVertexID vertexID,
+ long reconfigureDoneTime, int numTasks,
+ VertexLocationHint vertexLocationHint,
+ Map<String, EdgeProperty> sourceEdgeProperties,
+ Map<String, InputSpecUpdate> rootInputSpecUpdates,
+ boolean setParallelismCalledFlag) {
+ super();
+ this.vertexID = vertexID;
+ this.numTasks = numTasks;
+ this.vertexLocationHint = vertexLocationHint;
+ this.sourceEdgeProperties = sourceEdgeProperties;
+ this.rootInputSpecUpdates = rootInputSpecUpdates;
+ this.setParallelismCalledFlag = setParallelismCalledFlag;
+ }
+
+ @Override
+ public HistoryEventType getEventType() {
+ return HistoryEventType.VERTEX_CONFIGURE_DONE;
+ }
+
+ @Override
+ public boolean isRecoveryEvent() {
+ return true;
+ }
+
+ @Override
+ public boolean isHistoryEvent() {
+ return true;
+ }
+
+ public VertexConfigurationDoneProto toProto() {
+ VertexConfigurationDoneProto.Builder builder =
+ VertexConfigurationDoneProto.newBuilder();
+ builder.setVertexId(vertexID.toString())
+ .setReconfigureDoneTime(reconfigureDoneTime)
+ .setSetParallelismCalledFlag(setParallelismCalledFlag)
+ .setNumTasks(numTasks);
+
+ if (vertexLocationHint != null) {
+ builder.setVertexLocationHint(DagTypeConverters.convertVertexLocationHintToProto(
+ this.vertexLocationHint));
+ }
+ if (sourceEdgeProperties != null) {
+ for (Entry<String, EdgeProperty> entry :
+ sourceEdgeProperties.entrySet()) {
+ EdgeManagerDescriptorProto.Builder edgeMgrBuilder =
+ EdgeManagerDescriptorProto.newBuilder();
+ edgeMgrBuilder.setEdgeName(entry.getKey());
+ edgeMgrBuilder.setEdgeProperty(DagTypeConverters.convertToProto(entry.getValue()));
+ builder.addEdgeManagerDescriptors(edgeMgrBuilder.build());
+ }
+ }
+ if (rootInputSpecUpdates != null) {
+ for (Entry<String, InputSpecUpdate> entry : rootInputSpecUpdates.entrySet()) {
+ RootInputSpecUpdateProto.Builder rootInputSpecUpdateBuilder = RootInputSpecUpdateProto
+ .newBuilder();
+ rootInputSpecUpdateBuilder.setInputName(entry.getKey());
+ rootInputSpecUpdateBuilder.setForAllWorkUnits(entry.getValue().isForAllWorkUnits());
+ rootInputSpecUpdateBuilder.addAllNumPhysicalInputs(entry.getValue()
+ .getAllNumPhysicalInputs());
+ builder.addRootInputSpecUpdates(rootInputSpecUpdateBuilder.build());
+ }
+ }
+ return builder.build();
+ }
+
+ public void fromProto(VertexConfigurationDoneProto proto) {
+ this.vertexID = TezVertexID.fromString(proto.getVertexId());
+ this.reconfigureDoneTime = proto.getReconfigureDoneTime();
+ this.setParallelismCalledFlag = proto.getSetParallelismCalledFlag();
+ this.numTasks = proto.getNumTasks();
+ if (proto.hasVertexLocationHint()) {
+ this.vertexLocationHint = DagTypeConverters.convertVertexLocationHintFromProto(
+ proto.getVertexLocationHint());
+ }
+ if (proto.getEdgeManagerDescriptorsCount() > 0) {
+ this.sourceEdgeProperties = new HashMap<String, EdgeProperty>(
+ proto.getEdgeManagerDescriptorsCount());
+ for (EdgeManagerDescriptorProto edgeManagerProto :
+ proto.getEdgeManagerDescriptorsList()) {
+ EdgeProperty edgeProperty =
+ DagTypeConverters.convertFromProto(
+ edgeManagerProto.getEdgeProperty());
+ sourceEdgeProperties.put(edgeManagerProto.getEdgeName(),
+ edgeProperty);
+ }
+ }
+ if (proto.getRootInputSpecUpdatesCount() > 0) {
+ this.rootInputSpecUpdates = Maps.newHashMap();
+ for (RootInputSpecUpdateProto rootInputSpecUpdateProto : proto.getRootInputSpecUpdatesList()) {
+ InputSpecUpdate specUpdate;
+ if (rootInputSpecUpdateProto.getForAllWorkUnits()) {
+ specUpdate = InputSpecUpdate
+ .createAllTaskInputSpecUpdate(rootInputSpecUpdateProto.getNumPhysicalInputs(0));
+ } else {
+ specUpdate = InputSpecUpdate
+ .createPerTaskInputSpecUpdate(rootInputSpecUpdateProto.getNumPhysicalInputsList());
+ }
+ this.rootInputSpecUpdates.put(rootInputSpecUpdateProto.getInputName(), specUpdate);
+ }
+ }
+ }
+
+ @Override
+ public void toProtoStream(OutputStream outputStream) throws IOException {
+ toProto().writeDelimitedTo(outputStream);
+ }
+
+ @Override
+ public void fromProtoStream(InputStream inputStream) throws IOException {
+ VertexConfigurationDoneProto proto = VertexConfigurationDoneProto.parseDelimitedFrom(inputStream);
+ if (proto == null) {
+ throw new IOException("No data found in stream");
+ }
+ fromProto(proto);
+ }
+
+ @Override
+ public String toString() {
+ return "vertexId=" + vertexID
+ + ", reconfigureDoneTime=" + reconfigureDoneTime
+ + ", numTasks=" + numTasks
+ + ", vertexLocationHint=" +
+ (vertexLocationHint == null? "null" : vertexLocationHint)
+ + ", edgeManagersCount=" +
+ (sourceEdgeProperties == null? "null" : sourceEdgeProperties.size())
+ + ", rootInputSpecUpdateCount="
+ + (rootInputSpecUpdates == null ? "null" : rootInputSpecUpdates.size())
+ + ", setParallelismCalledFlag=" + setParallelismCalledFlag;
+ }
+
+ public TezVertexID getVertexID() {
+ return this.vertexID;
+ }
+
+ public int getNumTasks() {
+ return numTasks;
+ }
+
+ public VertexLocationHint getVertexLocationHint() {
+ return vertexLocationHint;
+ }
+
+ public Map<String, EdgeProperty> getSourceEdgeProperties() {
+ return sourceEdgeProperties;
+ }
+
+ public Map<String, InputSpecUpdate> getRootInputSpecUpdates() {
+ return rootInputSpecUpdates;
+ }
+
+ public long getReconfigureDoneTime() {
+ return reconfigureDoneTime;
+ }
+
+ public boolean isSetParallelismCalled() {
+ return setParallelismCalledFlag;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
index 8947b5f..ec8f3e1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
@@ -21,28 +21,35 @@ package org.apache.tez.dag.history.events;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.Collection;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.SummaryEvent;
import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.recovery.records.RecoveryProtos;
import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexGroupCommitFinishedProto;
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+
public class VertexGroupCommitFinishedEvent implements HistoryEvent, SummaryEvent {
private TezDAGID dagID;
private String vertexGroupName;
+ private Collection<TezVertexID> vertexIds;
private long commitFinishTime;
public VertexGroupCommitFinishedEvent() {
}
public VertexGroupCommitFinishedEvent(TezDAGID dagID,
- String vertexGroupName, long commitFinishTime) {
+ String vertexGroupName, Collection<TezVertexID> vertexIds, long commitFinishTime) {
this.dagID = dagID;
this.vertexGroupName = vertexGroupName;
+ this.vertexIds = vertexIds;
this.commitFinishTime = commitFinishTime;
}
@@ -62,15 +69,28 @@ public class VertexGroupCommitFinishedEvent implements HistoryEvent, SummaryEven
}
public VertexGroupCommitFinishedProto toProto() {
+ Collection<String> vertexIdsStr = Collections2.transform(vertexIds, new Function<TezVertexID, String>(){
+ @Override
+ public String apply(TezVertexID vertexId) {
+ return vertexId.toString();
+ }
+ });
return VertexGroupCommitFinishedProto.newBuilder()
.setDagId(dagID.toString())
.setVertexGroupName(vertexGroupName)
+ .addAllVertexIds(vertexIdsStr)
.build();
}
public void fromProto(VertexGroupCommitFinishedProto proto) {
this.dagID = TezDAGID.fromString(proto.getDagId());
this.vertexGroupName = proto.getVertexGroupName();
+ this.vertexIds = Collections2.transform(proto.getVertexIdsList(), new Function<String, TezVertexID>() {
+ @Override
+ public TezVertexID apply(String input) {
+ return TezVertexID.fromString(input);
+ }
+ });
}
@Override
@@ -124,4 +144,8 @@ public class VertexGroupCommitFinishedEvent implements HistoryEvent, SummaryEven
return dagID;
}
+ public Collection<TezVertexID> getVertexIds() {
+ return vertexIds;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
index c388957..3de355c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
@@ -21,28 +21,35 @@ package org.apache.tez.dag.history.events;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.Collection;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.SummaryEvent;
import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.recovery.records.RecoveryProtos;
import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexGroupCommitStartedProto;
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+
public class VertexGroupCommitStartedEvent implements HistoryEvent, SummaryEvent {
private TezDAGID dagID;
private String vertexGroupName;
+ private Collection<TezVertexID> vertexIds;
private long commitStartTime;
public VertexGroupCommitStartedEvent() {
}
public VertexGroupCommitStartedEvent(TezDAGID dagID,
- String vertexGroupName, long commitStartTime) {
+ String vertexGroupName, Collection<TezVertexID> vertexIds, long commitStartTime) {
this.dagID = dagID;
this.vertexGroupName = vertexGroupName;
+ this.vertexIds = vertexIds;
this.commitStartTime = commitStartTime;
}
@@ -62,15 +69,28 @@ public class VertexGroupCommitStartedEvent implements HistoryEvent, SummaryEvent
}
public VertexGroupCommitStartedProto toProto() {
+ Collection<String> vertexIdsStr = Collections2.transform(vertexIds, new Function<TezVertexID, String>(){
+ @Override
+ public String apply(TezVertexID vertexId) {
+ return vertexId.toString();
+ }
+ });
return VertexGroupCommitStartedProto.newBuilder()
.setDagId(dagID.toString())
.setVertexGroupName(vertexGroupName)
+ .addAllVertexIds(vertexIdsStr)
.build();
}
public void fromProto(VertexGroupCommitStartedProto proto) {
this.dagID = TezDAGID.fromString(proto.getDagId());
this.vertexGroupName = proto.getVertexGroupName();
+ this.vertexIds = Collections2.transform(proto.getVertexIdsList(), new Function<String, TezVertexID>() {
+ @Override
+ public TezVertexID apply(String input) {
+ return TezVertexID.fromString(input);
+ }
+ });
}
@Override
@@ -124,4 +144,7 @@ public class VertexGroupCommitStartedEvent implements HistoryEvent, SummaryEvent
return dagID;
}
+ public Collection<TezVertexID> getVertexIds() {
+ return vertexIds;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
index 01e0d3c..052908b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import org.apache.tez.dag.api.DagTypeConverters;
@@ -31,9 +32,14 @@ import org.apache.tez.dag.api.RootInputLeafOutput;
import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.utils.TezEventUtils;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.recovery.records.RecoveryProtos;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.TezEventProto;
import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexInitializedProto;
+import org.apache.tez.runtime.api.impl.TezEvent;
+
+import com.google.common.collect.Lists;
public class VertexInitializedEvent implements HistoryEvent {
@@ -44,6 +50,7 @@ public class VertexInitializedEvent implements HistoryEvent {
private int numTasks;
private String processorName;
private Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> additionalInputs;
+ private List<TezEvent> initGeneratedEvents;
public VertexInitializedEvent() {
}
@@ -51,7 +58,8 @@ public class VertexInitializedEvent implements HistoryEvent {
public VertexInitializedEvent(TezVertexID vertexId,
String vertexName, long initRequestedTime, long initedTime,
int numTasks, String processorName,
- Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> additionalInputs) {
+ Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> additionalInputs,
+ List<TezEvent> initGeneratedEvents) {
this.vertexName = vertexName;
this.vertexID = vertexId;
this.initRequestedTime = initRequestedTime;
@@ -59,6 +67,7 @@ public class VertexInitializedEvent implements HistoryEvent {
this.numTasks = numTasks;
this.processorName = processorName;
this.additionalInputs = additionalInputs;
+ this.initGeneratedEvents = initGeneratedEvents;
}
@Override
@@ -76,7 +85,7 @@ public class VertexInitializedEvent implements HistoryEvent {
return true;
}
- public RecoveryProtos.VertexInitializedProto toProto() {
+ public RecoveryProtos.VertexInitializedProto toProto() throws IOException {
VertexInitializedProto.Builder builder = VertexInitializedProto.newBuilder();
if (additionalInputs != null
&& !additionalInputs.isEmpty()) {
@@ -94,6 +103,11 @@ public class VertexInitializedEvent implements HistoryEvent {
builder.addInputs(inputBuilder.build());
}
}
+ if (initGeneratedEvents != null && !initGeneratedEvents.isEmpty()) {
+ for (TezEvent event : initGeneratedEvents) {
+ builder.addInitGeneratedEvents(TezEventUtils.toProto(event));
+ }
+ }
return builder.setVertexId(vertexID.toString())
.setVertexName(vertexName)
.setInitRequestedTime(initRequestedTime)
@@ -102,7 +116,7 @@ public class VertexInitializedEvent implements HistoryEvent {
.build();
}
- public void fromProto(RecoveryProtos.VertexInitializedProto proto) {
+ public void fromProto(RecoveryProtos.VertexInitializedProto proto) throws IOException {
this.vertexID = TezVertexID.fromString(proto.getVertexId());
this.vertexName = proto.getVertexName();
this.initRequestedTime = proto.getInitRequestedTime();
@@ -123,6 +137,14 @@ public class VertexInitializedEvent implements HistoryEvent {
additionalInputs.put(input.getName(), input);
}
}
+ int eventCount = proto.getInitGeneratedEventsCount();
+ if (eventCount > 0) {
+ this.initGeneratedEvents = Lists.newArrayListWithCapacity(eventCount);
+ }
+ for (TezEventProto eventProto :
+ proto.getInitGeneratedEventsList()) {
+ this.initGeneratedEvents.add(TezEventUtils.fromProto(eventProto));
+ }
}
@Override
@@ -149,7 +171,9 @@ public class VertexInitializedEvent implements HistoryEvent {
+ ", numTasks=" + numTasks
+ ", processorName=" + processorName
+ ", additionalInputsCount="
- + (additionalInputs != null ? additionalInputs.size() : 0);
+ + (additionalInputs != null ? additionalInputs.size() : 0)
+ + ", initGeneratedEventsCount="
+ + (initGeneratedEvents != null ? initGeneratedEvents.size() : 0);
}
public TezVertexID getVertexID() {
@@ -181,4 +205,7 @@ public class VertexInitializedEvent implements HistoryEvent {
return vertexName;
}
+ public List<TezEvent> getInitGeneratedEvents() {
+ return initGeneratedEvents;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
deleted file mode 100644
index 456e2a5..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.history.events;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.tez.dag.api.DagTypeConverters;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.VertexLocationHint;
-import org.apache.tez.dag.history.HistoryEvent;
-import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.dag.recovery.records.RecoveryProtos.EdgeManagerDescriptorProto;
-import org.apache.tez.dag.recovery.records.RecoveryProtos.RootInputSpecUpdateProto;
-import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexParallelismUpdatedProto;
-import org.apache.tez.runtime.api.InputSpecUpdate;
-
-import com.google.common.collect.Maps;
-
-public class VertexParallelismUpdatedEvent implements HistoryEvent {
-
- private TezVertexID vertexID;
- private int numTasks;
- private int oldNumTasks;
- private VertexLocationHint vertexLocationHint;
- private Map<String, EdgeProperty> sourceEdgeProperties;
- private Map<String, InputSpecUpdate> rootInputSpecUpdates;
- private long updateTime;
-
- public VertexParallelismUpdatedEvent() {
- }
-
- public VertexParallelismUpdatedEvent(TezVertexID vertexID,
- int numTasks, VertexLocationHint vertexLocationHint,
- Map<String, EdgeProperty> sourceEdgeProperties,
- Map<String, InputSpecUpdate> rootInputSpecUpdates, int oldNumTasks) {
- this.vertexID = vertexID;
- this.numTasks = numTasks;
- this.vertexLocationHint = vertexLocationHint;
- this.sourceEdgeProperties = sourceEdgeProperties;
- this.rootInputSpecUpdates = rootInputSpecUpdates;
- this.updateTime = System.currentTimeMillis();
- this.oldNumTasks = oldNumTasks;
- }
-
- @Override
- public HistoryEventType getEventType() {
- return HistoryEventType.VERTEX_PARALLELISM_UPDATED;
- }
-
- @Override
- public boolean isRecoveryEvent() {
- return true;
- }
-
- @Override
- public boolean isHistoryEvent() {
- return true;
- }
-
- public VertexParallelismUpdatedProto toProto() {
- VertexParallelismUpdatedProto.Builder builder =
- VertexParallelismUpdatedProto.newBuilder();
- builder.setVertexId(vertexID.toString())
- .setNumTasks(numTasks);
- if (vertexLocationHint != null) {
- builder.setVertexLocationHint(DagTypeConverters.convertVertexLocationHintToProto(
- this.vertexLocationHint));
- }
- if (sourceEdgeProperties != null) {
- for (Entry<String, EdgeProperty> entry :
- sourceEdgeProperties.entrySet()) {
- EdgeManagerDescriptorProto.Builder edgeMgrBuilder =
- EdgeManagerDescriptorProto.newBuilder();
- edgeMgrBuilder.setEdgeName(entry.getKey());
- edgeMgrBuilder.setEdgeProperty(DagTypeConverters.convertToProto(entry.getValue()));
- builder.addEdgeManagerDescriptors(edgeMgrBuilder.build());
- }
- }
- if (rootInputSpecUpdates != null) {
- for (Entry<String, InputSpecUpdate> entry : rootInputSpecUpdates.entrySet()) {
- RootInputSpecUpdateProto.Builder rootInputSpecUpdateBuilder = RootInputSpecUpdateProto
- .newBuilder();
- rootInputSpecUpdateBuilder.setInputName(entry.getKey());
- rootInputSpecUpdateBuilder.setForAllWorkUnits(entry.getValue().isForAllWorkUnits());
- rootInputSpecUpdateBuilder.addAllNumPhysicalInputs(entry.getValue()
- .getAllNumPhysicalInputs());
- builder.addRootInputSpecUpdates(rootInputSpecUpdateBuilder.build());
- }
- }
- return builder.build();
- }
-
- public void fromProto(VertexParallelismUpdatedProto proto) {
- this.vertexID = TezVertexID.fromString(proto.getVertexId());
- this.numTasks = proto.getNumTasks();
- if (proto.hasVertexLocationHint()) {
- this.vertexLocationHint = DagTypeConverters.convertVertexLocationHintFromProto(
- proto.getVertexLocationHint());
- }
- if (proto.getEdgeManagerDescriptorsCount() > 0) {
- this.sourceEdgeProperties = new HashMap<String, EdgeProperty>(
- proto.getEdgeManagerDescriptorsCount());
- for (EdgeManagerDescriptorProto edgeManagerProto :
- proto.getEdgeManagerDescriptorsList()) {
- EdgeProperty edgeProperty =
- DagTypeConverters.convertFromProto(
- edgeManagerProto.getEdgeProperty());
- sourceEdgeProperties.put(edgeManagerProto.getEdgeName(),
- edgeProperty);
- }
- }
- if (proto.getRootInputSpecUpdatesCount() > 0) {
- this.rootInputSpecUpdates = Maps.newHashMap();
- for (RootInputSpecUpdateProto rootInputSpecUpdateProto : proto.getRootInputSpecUpdatesList()) {
- InputSpecUpdate specUpdate;
- if (rootInputSpecUpdateProto.getForAllWorkUnits()) {
- specUpdate = InputSpecUpdate
- .createAllTaskInputSpecUpdate(rootInputSpecUpdateProto.getNumPhysicalInputs(0));
- } else {
- specUpdate = InputSpecUpdate
- .createPerTaskInputSpecUpdate(rootInputSpecUpdateProto.getNumPhysicalInputsList());
- }
- this.rootInputSpecUpdates.put(rootInputSpecUpdateProto.getInputName(), specUpdate);
- }
- }
- }
-
- @Override
- public void toProtoStream(OutputStream outputStream) throws IOException {
- toProto().writeDelimitedTo(outputStream);
- }
-
- @Override
- public void fromProtoStream(InputStream inputStream) throws IOException {
- VertexParallelismUpdatedProto proto = VertexParallelismUpdatedProto.parseDelimitedFrom(inputStream);
- if (proto == null) {
- throw new IOException("No data found in stream");
- }
- fromProto(proto);
- }
-
- @Override
- public String toString() {
- return "vertexId=" + vertexID
- + ", numTasks=" + numTasks
- + ", vertexLocationHint=" +
- (vertexLocationHint == null? "null" : vertexLocationHint)
- + ", edgeManagersCount=" +
- (sourceEdgeProperties == null? "null" : sourceEdgeProperties.size()
- + ", rootInputSpecUpdateCount="
- + (rootInputSpecUpdates == null ? "null" : rootInputSpecUpdates.size()));
- }
-
- public TezVertexID getVertexID() {
- return this.vertexID;
- }
-
- public int getNumTasks() {
- return numTasks;
- }
-
- public VertexLocationHint getVertexLocationHint() {
- return vertexLocationHint;
- }
-
- public Map<String, EdgeProperty> getSourceEdgeProperties() {
- return sourceEdgeProperties;
- }
-
- public Map<String, InputSpecUpdate> getRootInputSpecUpdates() {
- return rootInputSpecUpdates;
- }
-
- public long getUpdateTime() {
- return updateTime;
- }
-
- public int getOldNumTasks() {
- return oldNumTasks;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java
deleted file mode 100644
index 6f44f33..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.history.events;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.EnumSet;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.tez.common.ProtoConverters;
-import org.apache.tez.dag.history.HistoryEvent;
-import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.dag.recovery.records.RecoveryProtos;
-import org.apache.tez.dag.recovery.records.RecoveryProtos.TezDataMovementEventProto;
-import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexDataMovementEventsGeneratedProto;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
-import org.apache.tez.runtime.api.events.DataMovementEvent;
-import org.apache.tez.runtime.api.events.InputDataInformationEvent;
-import org.apache.tez.runtime.api.events.InputInitializerEvent;
-import org.apache.tez.runtime.api.impl.EventMetaData;
-import org.apache.tez.runtime.api.impl.EventType;
-import org.apache.tez.runtime.api.impl.TezEvent;
-
-import com.google.common.collect.Lists;
-
-// TODO PreCommit - rename this to VertexRecoverableEventGeneratedEvent
-public class VertexRecoverableEventsGeneratedEvent implements HistoryEvent {
-
- private static final Logger LOG = LoggerFactory.getLogger(
- VertexRecoverableEventsGeneratedEvent.class);
- private List<TezEvent> events;
- private TezVertexID vertexID;
-
- public VertexRecoverableEventsGeneratedEvent(TezVertexID vertexID,
- List<TezEvent> events) {
- this.vertexID = vertexID;
- this.events = Lists.newArrayListWithCapacity(events.size());
- for (TezEvent event : events) {
- if (EnumSet.of(EventType.DATA_MOVEMENT_EVENT,
- EventType.COMPOSITE_DATA_MOVEMENT_EVENT,
- EventType.ROOT_INPUT_DATA_INFORMATION_EVENT,
- EventType.ROOT_INPUT_INITIALIZER_EVENT)
- .contains(event.getEventType())) {
- this.events.add(event);
- }
- }
- if (events.isEmpty()) {
- throw new RuntimeException("Invalid creation of VertexDataMovementEventsGeneratedEvent"
- + ", no data movement/information events provided");
- }
- }
-
- public VertexRecoverableEventsGeneratedEvent() {
- }
-
- @Override
- public HistoryEventType getEventType() {
- return HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED;
- }
-
- @Override
- public boolean isRecoveryEvent() {
- return true;
- }
-
- @Override
- public boolean isHistoryEvent() {
- return false;
- }
-
- static RecoveryProtos.EventMetaDataProto convertEventMetaDataToProto(
- EventMetaData eventMetaData) {
- RecoveryProtos.EventMetaDataProto.Builder builder =
- RecoveryProtos.EventMetaDataProto.newBuilder()
- .setProducerConsumerType(eventMetaData.getEventGenerator().ordinal())
- .setEdgeVertexName(eventMetaData.getEdgeVertexName())
- .setTaskVertexName(eventMetaData.getTaskVertexName());
- if (eventMetaData.getTaskAttemptID() != null) {
- builder.setTaskAttemptId(eventMetaData.getTaskAttemptID().toString());
- }
- return builder.build();
- }
-
- static EventMetaData convertEventMetaDataFromProto(
- RecoveryProtos.EventMetaDataProto proto) {
- TezTaskAttemptID attemptID = null;
- if (proto.hasTaskAttemptId()) {
- attemptID = TezTaskAttemptID.fromString(proto.getTaskAttemptId());
- }
- return new EventMetaData(
- EventMetaData.EventProducerConsumerType.values()[proto.getProducerConsumerType()],
- proto.getTaskVertexName(),
- proto.getEdgeVertexName(),
- attemptID);
- }
-
- public VertexDataMovementEventsGeneratedProto toProto() {
- List<TezDataMovementEventProto> tezEventProtos = null;
- if (events != null) {
- tezEventProtos = Lists.newArrayListWithCapacity(events.size());
- for (TezEvent event : events) {
- TezDataMovementEventProto.Builder evtBuilder =
- TezDataMovementEventProto.newBuilder();
- if (event.getEventType().equals(EventType.COMPOSITE_DATA_MOVEMENT_EVENT)) {
- evtBuilder.setCompositeDataMovementEvent(
- ProtoConverters.convertCompositeDataMovementEventToProto(
- (CompositeDataMovementEvent) event.getEvent()));
- } else if (event.getEventType().equals(EventType.DATA_MOVEMENT_EVENT)) {
- evtBuilder.setDataMovementEvent(
- ProtoConverters.convertDataMovementEventToProto(
- (DataMovementEvent) event.getEvent()));
- } else if (event.getEventType().equals(EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) {
- evtBuilder.setRootInputDataInformationEvent(
- ProtoConverters.convertRootInputDataInformationEventToProto(
- (InputDataInformationEvent) event.getEvent()));
- } else if (event.getEventType().equals(EventType.ROOT_INPUT_INITIALIZER_EVENT)) {
- evtBuilder.setInputInitializerEvent(ProtoConverters
- .convertRootInputInitializerEventToProto((InputInitializerEvent) event.getEvent()));
- }
- if (event.getSourceInfo() != null) {
- evtBuilder.setSourceInfo(convertEventMetaDataToProto(event.getSourceInfo()));
- }
- if (event.getDestinationInfo() != null) {
- evtBuilder.setDestinationInfo(convertEventMetaDataToProto(event.getDestinationInfo()));
- }
- evtBuilder.setEventTime(event.getEventReceivedTime());
- tezEventProtos.add(evtBuilder.build());
- }
- }
- return VertexDataMovementEventsGeneratedProto.newBuilder()
- .setVertexId(vertexID.toString())
- .addAllTezDataMovementEvent(tezEventProtos)
- .build();
- }
-
- public void fromProto(VertexDataMovementEventsGeneratedProto proto) {
- this.vertexID = TezVertexID.fromString(proto.getVertexId());
- int eventCount = proto.getTezDataMovementEventCount();
- if (eventCount > 0) {
- this.events = Lists.newArrayListWithCapacity(eventCount);
- }
- for (TezDataMovementEventProto eventProto :
- proto.getTezDataMovementEventList()) {
- Event evt = null;
- if (eventProto.hasCompositeDataMovementEvent()) {
- evt = ProtoConverters.convertCompositeDataMovementEventFromProto(
- eventProto.getCompositeDataMovementEvent());
- } else if (eventProto.hasDataMovementEvent()) {
- evt = ProtoConverters.convertDataMovementEventFromProto(
- eventProto.getDataMovementEvent());
- } else if (eventProto.hasRootInputDataInformationEvent()) {
- evt = ProtoConverters.convertRootInputDataInformationEventFromProto(
- eventProto.getRootInputDataInformationEvent());
- } else if (eventProto.hasInputInitializerEvent()) {
- evt = ProtoConverters.convertRootInputInitializerEventFromProto(
- eventProto.getInputInitializerEvent());
- }
- EventMetaData sourceInfo = null;
- EventMetaData destinationInfo = null;
- if (eventProto.hasSourceInfo()) {
- sourceInfo = convertEventMetaDataFromProto(eventProto.getSourceInfo());
- }
- if (eventProto.hasDestinationInfo()) {
- destinationInfo = convertEventMetaDataFromProto(eventProto.getDestinationInfo());
- }
- TezEvent tezEvent = new TezEvent(evt, sourceInfo, eventProto.getEventTime());
- tezEvent.setDestinationInfo(destinationInfo);
- this.events.add(tezEvent);
- }
- }
-
- @Override
- public void toProtoStream(OutputStream outputStream) throws IOException {
- toProto().writeDelimitedTo(outputStream);
- }
-
- @Override
- public void fromProtoStream(InputStream inputStream) throws IOException {
- VertexDataMovementEventsGeneratedProto proto =
- VertexDataMovementEventsGeneratedProto.parseDelimitedFrom(inputStream);
- if (proto == null) {
- throw new IOException("No data found in stream");
- }
- fromProto(proto);
- }
-
- @Override
- public String toString() {
- return "vertexId=" + vertexID.toString()
- + ", eventCount=" + (events != null ? events.size() : "null");
-
- }
-
- public TezVertexID getVertexID() {
- return this.vertexID;
- }
-
- public List<TezEvent> getTezEvents() {
- return this.events;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index bf63045..c4e7e5b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -42,7 +42,7 @@ import org.apache.tez.dag.history.events.TaskFinishedEvent;
import org.apache.tez.dag.history.events.TaskStartedEvent;
import org.apache.tez.dag.history.events.VertexFinishedEvent;
import org.apache.tez.dag.history.events.VertexInitializedEvent;
-import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
+import org.apache.tez.dag.history.events.VertexConfigurationDoneEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.history.logging.EntityTypes;
import org.apache.tez.dag.history.utils.DAGUtils;
@@ -108,13 +108,12 @@ public class HistoryEventJsonConversion {
case TASK_ATTEMPT_FINISHED:
jsonObject = convertTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) historyEvent);
break;
- case VERTEX_PARALLELISM_UPDATED:
- jsonObject = convertVertexParallelismUpdatedEvent((VertexParallelismUpdatedEvent) historyEvent);
+ case VERTEX_CONFIGURE_DONE:
+ jsonObject = convertVertexReconfigureDoneEvent((VertexConfigurationDoneEvent) historyEvent);
break;
case DAG_RECOVERED:
jsonObject = convertDAGRecoveredEvent((DAGRecoveredEvent) historyEvent);
break;
- case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
case VERTEX_COMMIT_STARTED:
case VERTEX_GROUP_COMMIT_STARTED:
case VERTEX_GROUP_COMMIT_FINISHED:
@@ -662,7 +661,6 @@ public class HistoryEventJsonConversion {
JSONObject otherInfo = new JSONObject();
otherInfo.put(ATSConstants.START_TIME, event.getStartTime());
otherInfo.put(ATSConstants.SCHEDULED_TIME, event.getScheduledTime());
-
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
@@ -705,6 +703,41 @@ public class HistoryEventJsonConversion {
return jsonObject;
}
+ private static JSONObject convertVertexReconfigureDoneEvent(VertexConfigurationDoneEvent event) throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY, event.getVertexID().toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
+
+ // Events
+ JSONArray events = new JSONArray();
+ JSONObject updateEvent = new JSONObject();
+ updateEvent.put(ATSConstants.TIMESTAMP, event.getReconfigureDoneTime());
+ updateEvent.put(ATSConstants.EVENT_TYPE,
+ HistoryEventType.VERTEX_CONFIGURE_DONE.name());
+
+ JSONObject eventInfo = new JSONObject();
+ eventInfo.put(ATSConstants.NUM_TASKS, event.getNumTasks());
+ if (event.getSourceEdgeProperties() != null && !event.getSourceEdgeProperties().isEmpty()) {
+ JSONObject updatedEdgeManagers = new JSONObject();
+ for (Entry<String, EdgeProperty> entry :
+ event.getSourceEdgeProperties().entrySet()) {
+ updatedEdgeManagers.put(entry.getKey(),
+ new JSONObject(DAGUtils.convertEdgeProperty(entry.getValue())));
+ }
+ eventInfo.put(ATSConstants.UPDATED_EDGE_MANAGERS, updatedEdgeManagers);
+ }
+ updateEvent.put(ATSConstants.EVENT_INFO, eventInfo);
+ events.put(updateEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ // Other info
+ JSONObject otherInfo = new JSONObject();
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ // TODO add more on all other updated information
+ return jsonObject;
+ }
+
private static JSONObject convertVertexInitializedEvent(VertexInitializedEvent event) throws JSONException {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY, event.getVertexID().toString());
@@ -773,42 +806,4 @@ public class HistoryEventJsonConversion {
return jsonObject;
}
- private static JSONObject convertVertexParallelismUpdatedEvent(
- VertexParallelismUpdatedEvent event) throws JSONException {
- JSONObject jsonObject = new JSONObject();
- jsonObject.put(ATSConstants.ENTITY, event.getVertexID().toString());
- jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
-
- // Events
- JSONArray events = new JSONArray();
- JSONObject updateEvent = new JSONObject();
- updateEvent.put(ATSConstants.TIMESTAMP, event.getUpdateTime());
- updateEvent.put(ATSConstants.EVENT_TYPE,
- HistoryEventType.VERTEX_PARALLELISM_UPDATED.name());
-
- JSONObject eventInfo = new JSONObject();
- eventInfo.put(ATSConstants.OLD_NUM_TASKS, event.getOldNumTasks());
- eventInfo.put(ATSConstants.NUM_TASKS, event.getNumTasks());
- if (event.getSourceEdgeProperties() != null && !event.getSourceEdgeProperties().isEmpty()) {
- JSONObject updatedEdgeManagers = new JSONObject();
- for (Entry<String, EdgeProperty> entry :
- event.getSourceEdgeProperties().entrySet()) {
- updatedEdgeManagers.put(entry.getKey(),
- new JSONObject(DAGUtils.convertEdgeProperty(entry.getValue())));
- }
- eventInfo.put(ATSConstants.UPDATED_EDGE_MANAGERS, updatedEdgeManagers);
- }
- updateEvent.put(ATSConstants.EVENT_INFO, eventInfo);
- events.put(updateEvent);
- jsonObject.put(ATSConstants.EVENTS, events);
-
- // Other info
- JSONObject otherInfo = new JSONObject();
- otherInfo.put(ATSConstants.NUM_TASKS, event.getNumTasks());
- jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-
- // TODO add more on all other updated information
- return jsonObject;
- }
-
}
http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
index 585050d..fed4f3d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
@@ -50,7 +50,7 @@ import com.google.common.annotations.VisibleForTesting;
public class RecoveryService extends AbstractService {
private static final Logger LOG = LoggerFactory.getLogger(RecoveryService.class);
- private final AppContext appContext;
+ protected final AppContext appContext;
public static final String RECOVERY_FATAL_OCCURRED_DIR =
"RecoveryFatalErrorOccurred";
@@ -73,7 +73,7 @@ public class RecoveryService extends AbstractService {
private Set<TezDAGID> completedDAGs = new HashSet<TezDAGID>();
private Set<TezDAGID> skippedDAGs = new HashSet<TezDAGID>();
- private Thread eventHandlingThread;
+ public Thread eventHandlingThread;
private AtomicBoolean stopped = new AtomicBoolean(false);
private AtomicBoolean started = new AtomicBoolean(false);
private int eventCounter = 0;
@@ -374,7 +374,7 @@ public class RecoveryService extends AbstractService {
}
}
- private void handleSummaryEvent(TezDAGID dagID,
+ protected void handleSummaryEvent(TezDAGID dagID,
HistoryEventType eventType,
SummaryEvent summaryEvent) throws IOException {
if (LOG.isDebugEnabled()) {
@@ -506,4 +506,8 @@ public class RecoveryService extends AbstractService {
Thread.yield();
}
}
+
+ public void setStopped(boolean stopped) {
+ this.stopped.set(stopped);
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/utils/TezEventUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/TezEventUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/TezEventUtils.java
new file mode 100644
index 0000000..cc89b9f
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/TezEventUtils.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.history.utils;
+
+import java.io.IOException;
+import org.apache.tez.common.ProtoConverters;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.TezEventProto;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.events.InputInitializerEvent;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.EventType;
+import org.apache.tez.runtime.api.impl.TezEvent;
+
+public class TezEventUtils {
+
+ public static TezEventProto toProto(TezEvent event) throws IOException {
+ TezEventProto.Builder evtBuilder =
+ TezEventProto.newBuilder();
+ if (event.getEventType().equals(EventType.COMPOSITE_DATA_MOVEMENT_EVENT)) {
+ evtBuilder.setCompositeDataMovementEvent(
+ ProtoConverters.convertCompositeDataMovementEventToProto(
+ (CompositeDataMovementEvent) event.getEvent()));
+ } else if (event.getEventType().equals(EventType.DATA_MOVEMENT_EVENT)) {
+ evtBuilder.setDataMovementEvent(
+ ProtoConverters.convertDataMovementEventToProto(
+ (DataMovementEvent) event.getEvent()));
+ } else if (event.getEventType().equals(EventType.ROOT_INPUT_INITIALIZER_EVENT)) {
+ evtBuilder.setInputInitializerEvent(ProtoConverters
+ .convertRootInputInitializerEventToProto((InputInitializerEvent) event.getEvent()));
+ } else if (event.getEventType().equals(EventType.VERTEX_MANAGER_EVENT)) {
+ evtBuilder.setVmEvent(ProtoConverters
+ .convertVertexManagerEventToProto((VertexManagerEvent)event.getEvent()));
+ } else if (event.getEventType().equals(EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) {
+ evtBuilder.setRootInputDataInformationEvent(
+ ProtoConverters.convertRootInputDataInformationEventToProto(
+ (InputDataInformationEvent) event.getEvent()));
+ } else {
+ throw new IOException("Unsupported TezEvent type:" + event.getEventType());
+ }
+
+ if (event.getSourceInfo() != null) {
+ evtBuilder.setSourceInfo(convertEventMetaDataToProto(event.getSourceInfo()));
+ }
+ if (event.getDestinationInfo() != null) {
+ evtBuilder.setDestinationInfo(convertEventMetaDataToProto(event.getDestinationInfo()));
+ }
+ evtBuilder.setEventTime(event.getEventReceivedTime());
+ return evtBuilder.build();
+ }
+
+ public static TezEvent fromProto(TezEventProto eventProto) throws IOException {
+ Event evt = null;
+ if (eventProto.hasCompositeDataMovementEvent()) {
+ evt = ProtoConverters.convertCompositeDataMovementEventFromProto(
+ eventProto.getCompositeDataMovementEvent());
+ } else if (eventProto.hasDataMovementEvent()) {
+ evt = ProtoConverters.convertDataMovementEventFromProto(
+ eventProto.getDataMovementEvent());
+ } else if (eventProto.hasInputInitializerEvent()) {
+ evt = ProtoConverters.convertRootInputInitializerEventFromProto(
+ eventProto.getInputInitializerEvent());
+ } else if (eventProto.hasVmEvent()) {
+ evt = ProtoConverters.convertVertexManagerEventFromProto(
+ eventProto.getVmEvent());
+ } else if (eventProto.hasRootInputDataInformationEvent()) {
+ evt = ProtoConverters.convertRootInputDataInformationEventFromProto(
+ eventProto.getRootInputDataInformationEvent());
+ } else {
+ throw new IOException("Unsupported TezEvent type");
+ }
+
+ EventMetaData sourceInfo = null;
+ EventMetaData destinationInfo = null;
+ if (eventProto.hasSourceInfo()) {
+ sourceInfo = convertEventMetaDataFromProto(eventProto.getSourceInfo());
+ }
+ if (eventProto.hasDestinationInfo()) {
+ destinationInfo = convertEventMetaDataFromProto(eventProto.getDestinationInfo());
+ }
+ TezEvent tezEvent = new TezEvent(evt, sourceInfo, eventProto.getEventTime());
+ tezEvent.setDestinationInfo(destinationInfo);
+ return tezEvent;
+ }
+
+ public static RecoveryProtos.EventMetaDataProto convertEventMetaDataToProto(
+ EventMetaData eventMetaData) {
+ RecoveryProtos.EventMetaDataProto.Builder builder =
+ RecoveryProtos.EventMetaDataProto.newBuilder()
+ .setProducerConsumerType(eventMetaData.getEventGenerator().ordinal())
+ .setEdgeVertexName(eventMetaData.getEdgeVertexName())
+ .setTaskVertexName(eventMetaData.getTaskVertexName());
+ if (eventMetaData.getTaskAttemptID() != null) {
+ builder.setTaskAttemptId(eventMetaData.getTaskAttemptID().toString());
+ }
+ return builder.build();
+ }
+
+ public static EventMetaData convertEventMetaDataFromProto(
+ RecoveryProtos.EventMetaDataProto proto) {
+ TezTaskAttemptID attemptID = null;
+ if (proto.hasTaskAttemptId()) {
+ attemptID = TezTaskAttemptID.fromString(proto.getTaskAttemptId());
+ }
+ return new EventMetaData(
+ EventMetaData.EventProducerConsumerType.values()[proto.getProducerConsumerType()],
+ proto.getTaskVertexName(),
+ proto.getEdgeVertexName(),
+ attemptID);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index 5392d8a..b9e4507 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -90,6 +90,8 @@ message VertexInitializedProto {
optional int64 init_time = 4;
optional int32 num_tasks = 5;
repeated RootInputLeafOutputProto inputs = 6;
+ repeated TezEventProto init_generated_events = 7;
+ optional bool isReconfigurePlanned = 8;
}
message VertexStartedProto {
@@ -99,6 +101,16 @@ message VertexStartedProto {
optional int64 start_time = 4;
}
+message VertexConfigurationDoneProto {
+ optional string vertex_id = 1;
+ optional int64 reconfigure_done_time = 2;
+ optional int32 num_tasks = 3;
+ optional VertexLocationHintProto vertex_location_hint = 4;
+ repeated EdgeManagerDescriptorProto edge_manager_descriptors = 5;
+ repeated RootInputSpecUpdateProto root_input_spec_updates = 6;
+ optional bool setParallelismCalled_flag = 7;
+}
+
message EdgeManagerDescriptorProto {
optional string edge_name = 1;
optional PlanEdgeProperty edge_property = 2;
@@ -110,14 +122,6 @@ message RootInputSpecUpdateProto {
repeated int32 num_physical_inputs = 3;
}
-message VertexParallelismUpdatedProto {
- optional string vertex_id = 1;
- optional int32 num_tasks = 2;
- optional VertexLocationHintProto vertex_location_hint = 3;
- repeated EdgeManagerDescriptorProto edge_manager_descriptors = 4;
- repeated RootInputSpecUpdateProto root_input_spec_updates = 5;
-}
-
message VertexCommitStartedProto {
optional string vertex_id = 1;
}
@@ -129,11 +133,13 @@ message VertexCommitFinishedProto {
message VertexGroupCommitStartedProto {
optional string dag_id = 1;
optional string vertex_group_name = 2;
+ repeated string vertex_ids = 3;
}
message VertexGroupCommitFinishedProto {
optional string dag_id = 1;
optional string vertex_group_name = 2;
+ repeated string vertex_ids = 3;
}
message VertexFinishedProto {
@@ -183,6 +189,7 @@ message TaskAttemptFinishedProto {
optional TezCountersProto counters = 9;
optional string error_enum = 10;
repeated DataEventDependencyInfoProto data_events = 11;
+ repeated TezEventProto ta_generated_events = 12;
}
message EventMetaDataProto {
@@ -192,7 +199,7 @@ message EventMetaDataProto {
optional string task_attempt_id = 4;
}
-message TezDataMovementEventProto {
+message TezEventProto {
optional EventMetaDataProto source_info = 1;
optional EventMetaDataProto destination_info = 2;
optional DataMovementEventProto data_movement_event = 3;
@@ -200,11 +207,7 @@ message TezDataMovementEventProto {
optional RootInputDataInformationEventProto root_input_data_information_event = 5;
optional RootInputInitializerEventProto input_initializer_event = 6;
optional int64 event_time = 7;
-}
-
-message VertexDataMovementEventsGeneratedProto {
- optional string vertex_id = 1;
- repeated TezDataMovementEventProto tez_data_movement_event = 2;
+ optional VertexManagerEventProto vm_event = 8;
}
message SummaryEventProto {
http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestVertexStatusBuilder.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestVertexStatusBuilder.java b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestVertexStatusBuilder.java
index 521ed50..0159372 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestVertexStatusBuilder.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestVertexStatusBuilder.java
@@ -33,12 +33,7 @@ public class TestVertexStatusBuilder {
VertexStatusBuilder.getProtoState(state);
VertexStatus.State clientState =
VertexStatus.getState(stateProto);
- if (state.equals(VertexState.RECOVERING)) {
- Assert.assertEquals(clientState.name(),
- State.NEW.name());
- } else {
- Assert.assertEquals(state.name(), clientState.name());
- }
+ Assert.assertEquals(state.name(), clientState.name());
}
}