You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/09/11 20:16:59 UTC
git commit: Revert due to build failure. "TEZ-1345. Add checks to
guarantee all init events are written to recovery to consider vertex
initialized. (Jeff Zhang via hitesh)"
Repository: tez
Updated Branches:
refs/heads/master 2bdac2749 -> d6589d3ac
Revert due to build failure. "TEZ-1345. Add checks to guarantee all init events are written to recovery to consider vertex initialized. (Jeff Zhang via hitesh)"
This reverts commit 2bdac27495f6899b766e537576e7edf6df53c7a3.
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d6589d3a
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d6589d3a
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d6589d3a
Branch: refs/heads/master
Commit: d6589d3accf01ce77adea74b8595f70431611563
Parents: 2bdac27
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Sep 11 11:13:20 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Sep 11 11:13:20 2014 -0700
----------------------------------------------------------------------
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 7 +-
.../tez/dag/app/dag/impl/VertexManager.java | 28 ++++----
.../tez/dag/app/dag/impl/TestVertexManager.java | 76 --------------------
.../org/apache/tez/test/TestDAGRecovery.java | 67 +----------------
.../apache/tez/test/dag/MultiAttemptDAG.java | 35 +--------
5 files changed, 20 insertions(+), 193 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/d6589d3a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 670f445..ff556ba 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -156,6 +156,7 @@ import org.apache.tez.runtime.api.impl.GroupInputSpec;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultiset;
@@ -2761,12 +2762,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexEventRootInputInitialized liInitEvent = (VertexEventRootInputInitialized) event;
VertexState state = vertex.getState();
if (state == VertexState.INITIALIZING) {
- List<TezEvent> inputInfoEvents =
- vertex.vertexManager.onRootVertexInitialized(
+ vertex.vertexManager.onRootVertexInitialized(
liInitEvent.getInputName(),
vertex.getAdditionalInputs().get(liInitEvent.getInputName())
.getIODescriptor(), liInitEvent.getEvents());
- ROUTE_EVENT_TRANSITION.transition(vertex, new VertexEventRouteEvent(vertex.vertexId, inputInfoEvents));
}
vertex.numInitializedInputs++;
@@ -2907,7 +2906,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
+ " initWaitsForRootInitializers: " + initWaitsForRootInitializers);
return false;
}
-
+
public static class StartWhileInitializingTransition implements
SingleArcTransition<VertexImpl, VertexEvent> {
http://git-wip-us.apache.org/repos/asf/tez/blob/d6589d3a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index c2ff660..177b946 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -24,7 +24,6 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
@@ -45,6 +44,7 @@ import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.runtime.api.Event;
@@ -68,11 +68,10 @@ public class VertexManager {
VertexManagerPluginContextImpl pluginContext;
UserPayload payload = null;
AppContext appContext;
- ConcurrentHashMap<String, List<TezEvent>> cachedRootInputEventMap;
-
+
class VertexManagerPluginContextImpl implements VertexManagerPluginContext {
// TODO Add functionality to allow VertexManagers to send VertexManagerEvents
-
+
private EventMetaData rootEventSourceMetadata = new EventMetaData(EventProducerConsumerType.INPUT,
managedVertex.getName(), "NULL_VERTEX", null);
private Map<String, EventMetaData> destinationEventMetadataMap = Maps.newHashMap();
@@ -81,7 +80,7 @@ public class VertexManager {
public Map<String, EdgeProperty> getInputVertexEdgeProperties() {
// TODO Something similar for Initial Inputs - payload etc visible
Map<Vertex, Edge> inputs = managedVertex.getInputVertices();
- Map<String, EdgeProperty> vertexEdgeMap =
+ Map<String, EdgeProperty> vertexEdgeMap =
Maps.newHashMapWithExpectedSize(inputs.size());
for (Map.Entry<Vertex, Edge> entry : inputs.entrySet()) {
vertexEdgeMap.put(entry.getKey().getName(), entry.getValue().getEdgeProperty());
@@ -116,7 +115,7 @@ public class VertexManager {
@Override
public Set<String> getVertexInputNames() {
Set<String> inputNames = null;
- Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
+ Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
inputs = managedVertex.getAdditionalInputs();
if (inputs != null) {
inputNames = inputs.keySet();
@@ -129,6 +128,7 @@ public class VertexManager {
return payload;
}
+ @SuppressWarnings("unchecked")
@Override
public void addRootInputEvents(final String inputName,
Collection<InputDataInformationEvent> events) {
@@ -142,8 +142,8 @@ public class VertexManager {
return tezEvent;
}
});
-
- cachedRootInputEventMap.put(inputName,Lists.newArrayList(tezEvents));
+ appContext.getEventHandler().handle(
+ new VertexEventRouteEvent(managedVertex.getVertexId(), Lists.newArrayList(tezEvents)));
// Recovery handling is taken care of by the Vertex.
}
@@ -201,7 +201,7 @@ public class VertexManager {
}
}
- public VertexManager(VertexManagerPluginDescriptor pluginDesc,
+ public VertexManager(VertexManagerPluginDescriptor pluginDesc,
Vertex managedVertex, AppContext appContext) {
checkNotNull(pluginDesc, "pluginDesc is null");
checkNotNull(managedVertex, "managedVertex is null");
@@ -209,13 +209,12 @@ public class VertexManager {
this.pluginDesc = pluginDesc;
this.managedVertex = managedVertex;
this.appContext = appContext;
- this.cachedRootInputEventMap = new ConcurrentHashMap<String, List<TezEvent>>();
}
-
+
public VertexManagerPlugin getPlugin() {
return plugin;
}
-
+
public void initialize() {
pluginContext = new VertexManagerPluginContextImpl();
if (pluginDesc != null) {
@@ -247,7 +246,7 @@ public class VertexManager {
public void onSourceTaskCompleted(TezTaskID tezTaskId) {
Integer taskId = new Integer(tezTaskId.getId());
- String vertexName =
+ String vertexName =
appContext.getCurrentDAG().getVertex(tezTaskId.getVertexID()).getName();
plugin.onSourceTaskCompleted(vertexName, taskId);
}
@@ -256,9 +255,8 @@ public class VertexManager {
plugin.onVertexManagerEventReceived(vmEvent);
}
- public List<TezEvent> onRootVertexInitialized(String inputName,
+ public void onRootVertexInitialized(String inputName,
InputDescriptor inputDescriptor, List<Event> events) {
plugin.onRootVertexInitialized(inputName, inputDescriptor, events);
- return cachedRootInputEventMap.get(inputName);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/d6589d3a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
deleted file mode 100644
index b3e66bc..0000000
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.dag.impl;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.events.InputDataInformationEvent;
-import org.apache.tez.runtime.api.impl.TezEvent;
-import org.junit.Test;
-
-public class TestVertexManager {
-
- @Test
- public void testOnRootVertexInitialized() {
- Vertex mockVertex = mock(Vertex.class, RETURNS_DEEP_STUBS);
- AppContext mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
- doReturn("vertex1").when(mockVertex).getName();
- when(
- mockAppContext.getCurrentDAG().getVertex(any(String.class))
- .getTotalTasks()).thenReturn(1);
-
- VertexManager vm =
- new VertexManager(
- VertexManagerPluginDescriptor.create(RootInputVertexManager.class
- .getName()), mockVertex, mockAppContext);
- vm.initialize();
- InputDescriptor id1 = mock(InputDescriptor.class);
- List<Event> events1 = new LinkedList<Event>();
- InputDataInformationEvent diEvent1 =
- InputDataInformationEvent.createWithSerializedPayload(0, null);
- events1.add(diEvent1);
- List<TezEvent> tezEvents1 =
- vm.onRootVertexInitialized("input1", id1, events1);
- assertEquals(1, tezEvents1.size());
- assertEquals(diEvent1, tezEvents1.get(0).getEvent());
-
- InputDescriptor id2 = mock(InputDescriptor.class);
- List<Event> events2 = new LinkedList<Event>();
- InputDataInformationEvent diEvent2 =
- InputDataInformationEvent.createWithSerializedPayload(0, null);
- events2.add(diEvent2);
- List<TezEvent> tezEvents2 =
- vm.onRootVertexInitialized("input1", id2, events2);
- assertEquals(tezEvents2.size(), 1);
- assertEquals(diEvent2, tezEvents2.get(0).getEvent());
- }
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/d6589d3a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
index 7676313..c9acdc2 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
@@ -24,30 +24,21 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.client.TezClient;
-import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.DAGStatus.State;
-import org.apache.tez.dag.app.RecoveryParser;
-import org.apache.tez.dag.history.HistoryEvent;
-import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
-import org.apache.tez.dag.history.events.VertexInitializedEvent;
import org.apache.tez.test.dag.MultiAttemptDAG;
import org.apache.tez.test.dag.MultiAttemptDAG.FailingInputInitializer;
import org.apache.tez.test.dag.MultiAttemptDAG.NoOpInput;
-import org.apache.tez.test.dag.MultiAttemptDAG.TestRootInputInitializer;
import org.apache.tez.test.dag.SimpleVTestDAG;
import org.junit.After;
import org.junit.AfterClass;
@@ -57,7 +48,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
-import java.util.List;
import java.util.Random;
public class TestDAGRecovery {
@@ -71,7 +61,6 @@ public class TestDAGRecovery {
private static MiniDFSCluster dfsCluster = null;
private static TezClient tezSession = null;
private static FileSystem remoteFs = null;
- private static TezConfiguration tezConf = null;
@BeforeClass
public static void beforeClass() throws Exception {
@@ -131,7 +120,7 @@ public class TestDAGRecovery {
.valueOf(new Random().nextInt(100000))));
TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
- tezConf = new TezConfiguration(miniTezCluster.getConfig());
+ TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
tezConf.setInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, 0);
tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "DEBUG");
tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
@@ -141,7 +130,6 @@ public class TestDAGRecovery {
tezConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 500);
tezConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, " -Xmx256m");
tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
- tezConf.set(TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE, "false");
tezSession = TezClient.create("TestDAGRecovery", tezConf);
tezSession.start();
@@ -177,64 +165,13 @@ public class TestDAGRecovery {
Assert.assertEquals(finalState, dagStatus.getState());
}
- private void verifyRecoveryLog() throws IOException{
- ApplicationId appId = tezSession.getAppMasterApplicationId();
- Path tezSystemStagingDir = TezCommonUtils.getTezSystemStagingPath(tezConf, appId.toString());
- Path recoveryDataDir = TezCommonUtils.getRecoveryPath(tezSystemStagingDir, tezConf);
-
- FileSystem fs = tezSystemStagingDir.getFileSystem(tezConf);
- for (int i=1; i<=3; ++i) {
- Path currentAttemptRecoveryDataDir = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir,i);
- Path recoveryFilePath = new Path(currentAttemptRecoveryDataDir,
- appId.toString().replace("application", "dag") + "_1" + TezConstants.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
- List<HistoryEvent> historyEvents = RecoveryParser.parseDAGRecoveryFile(
- fs.open(recoveryFilePath));
-
- int inputInfoEventIndex = -1;
- int vertexInitedEventIndex = -1;
- for (int j=0;j<historyEvents.size(); ++j) {
- HistoryEvent historyEvent = historyEvents.get(j);
- LOG.info("Parsed event from recovery stream"
- + ", eventType=" + historyEvent.getEventType()
- + ", event=" + historyEvent);
- if (historyEvent.getEventType() == HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED) {
- VertexDataMovementEventsGeneratedEvent dmEvent =
- (VertexDataMovementEventsGeneratedEvent)historyEvent;
- // TODO do not need to check whether it is -1 after Tez-1521 is resolved
- if (dmEvent.getVertexID().getId() == 0 && inputInfoEventIndex == -1) {
- inputInfoEventIndex = j;
- }
- }
- if (historyEvent.getEventType() == HistoryEventType.VERTEX_INITIALIZED) {
- VertexInitializedEvent vInitedEvent = (VertexInitializedEvent) historyEvent;
- if (vInitedEvent.getVertexID().getId() == 0) {
- vertexInitedEventIndex = j;
- }
- }
- }
- // v1's init events must be logged before its VertexInitializedEvent (Tez-1345)
- Assert.assertTrue("can not find VERTEX_DATA_MOVEMENT_EVENTS_GENERATED for v1", inputInfoEventIndex != -1);
- Assert.assertTrue("can not find VERTEX_INITIALIZED for v1", vertexInitedEventIndex != -1);
- Assert.assertTrue("VERTEX_DATA_MOVEMENT_EVENTS_GENERATED is logged before VERTEX_INITIALIZED for v1",
- inputInfoEventIndex < vertexInitedEventIndex);
- }
- }
-
@Test(timeout=120000)
public void testBasicRecovery() throws Exception {
DAG dag = MultiAttemptDAG.createDAG("TestBasicRecovery", null);
- // add input to v1 to make sure that there will be init events for v1 (TEZ-1345)
- DataSourceDescriptor dataSource =
- DataSourceDescriptor.create(InputDescriptor.create(NoOpInput.class.getName()),
- InputInitializerDescriptor.create(TestRootInputInitializer.class.getName()), null);
- dag.getVertex("v1").addDataSource("Input", dataSource);
-
runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
- verifyRecoveryLog();
-
// it should fail if submitting same dags in recovery mode (TEZ-1064)
- try {
+ try{
DAGClient dagClient = tezSession.submitDAG(dag);
Assert.fail("Expected DAG submit to fail on duplicate dag name");
} catch (TezException e) {
http://git-wip-us.apache.org/repos/asf/tez/blob/d6589d3a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
index 58b9413..7fc9ad7 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
@@ -22,7 +22,6 @@ import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import java.nio.ByteBuffer;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -54,15 +53,12 @@ import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.InputInitializer;
import org.apache.tez.runtime.api.InputInitializerContext;
import org.apache.tez.runtime.api.Writer;
-import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
-import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.test.TestInput;
import org.apache.tez.test.TestOutput;
import org.apache.tez.test.TestProcessor;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -152,15 +148,8 @@ public class MultiAttemptDAG {
}
@Override
- public void onRootVertexInitialized(String inputName,
- InputDescriptor inputDescriptor, List<Event> events) {
- List<InputDataInformationEvent> inputInfoEvents = new ArrayList<InputDataInformationEvent>();
- for (Event event: events) {
- if (event instanceof InputDataInformationEvent) {
- inputInfoEvents.add((InputDataInformationEvent)event);
- }
- }
- getContext().addRootInputEvents(inputName, inputInfoEvents);
+ public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor, List<Event> events) {
+ // Do nothing
}
}
@@ -225,26 +214,6 @@ public class MultiAttemptDAG {
}
}
- public static class TestRootInputInitializer extends InputInitializer {
-
- public TestRootInputInitializer(InputInitializerContext initializerContext) {
- super(initializerContext);
- }
-
- @Override
- public List<Event> initialize() throws Exception {
- List<Event> events = new ArrayList<Event>();
- events.add(InputDataInformationEvent.createWithSerializedPayload(0, ByteBuffer.allocate(0)));
- return events;
- }
-
- @Override
- public void handleInputInitializerEvent(List<InputInitializerEvent> events)
- throws Exception {
- throw new UnsupportedOperationException("Not supported");
- }
- }
-
public static class FailingInputInitializer extends InputInitializer {
public FailingInputInitializer(InputInitializerContext initializerContext) {