You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/09/24 23:03:43 UTC

git commit: TEZ-1568. Add system test for propagation of diagnostics for errors (Jeff Zhang via bikas)

Repository: tez
Updated Branches:
  refs/heads/master 485f675bd -> df375e82f


TEZ-1568. Add system test for propagation of diagnostics for errors (Jeff Zhang via bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/df375e82
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/df375e82
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/df375e82

Branch: refs/heads/master
Commit: df375e82f964fb77982d0a8bfad62bab93bdb0eb
Parents: 485f675
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Sep 24 14:03:35 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Sep 24 14:03:35 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../tez/test/TestExceptionPropagation.java      | 510 +++++++++++++++++++
 2 files changed, 511 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/df375e82/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2307c6f..eda417a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -25,6 +25,7 @@ ALL CHANGES:
   TEZ-1607. support mr envs in mrrsleep and testorderedwordcount
   TEZ-1499. Add SortMergeJoinExample to tez-examples
   TEZ-1613. Decrease running time for TestAMRecovery
+  TEZ-1240. Add system test for propagation of diagnostics for errors
 
 Release 0.5.1: Unreleased
 

http://git-wip-us.apache.org/repos/asf/tez/blob/df375e82/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
new file mode 100644
index 0000000..454f603
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
@@ -0,0 +1,510 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
+import org.apache.tez.runtime.api.AbstractLogicalOutput;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.api.InputInitializer;
+import org.apache.tez.runtime.api.InputInitializerContext;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.Writer;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.events.InputInitializerEvent;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.junit.Test;
+
+public class TestExceptionPropagation {
+
+  private static final Log LOG = LogFactory
+      .getLog(TestExceptionPropagation.class);
+
+  private static TezConfiguration tezConf;
+  private static Configuration conf = new Configuration();
+  private static MiniTezCluster miniTezCluster = null;
+  private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
+      + TestExceptionPropagation.class.getName() + "-tmpDir";
+  private static MiniDFSCluster dfsCluster = null;
+  private static FileSystem remoteFs = null;
+
+  private static TezClient tezSession = null;
+  private static TezClient tezClient = null;
+
+  private void startMiniTezCluster() {
+    LOG.info("Starting mini clusters");
+    try {
+      conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+      dfsCluster =
+          new MiniDFSCluster.Builder(conf).numDataNodes(3).format(true)
+              .racks(null).build();
+      remoteFs = dfsCluster.getFileSystem();
+    } catch (IOException io) {
+      throw new RuntimeException("problem starting mini dfs cluster", io);
+    }
+    miniTezCluster =
+        new MiniTezCluster(TestExceptionPropagation.class.getName(), 1, 1, 1);
+    Configuration miniTezconf = new Configuration(conf);
+    miniTezconf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 4);
+    miniTezconf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+    miniTezCluster.init(miniTezconf);
+    miniTezCluster.start();
+  }
+
+  private void stopTezMiniCluster() {
+    if (miniTezCluster != null) {
+      try {
+        LOG.info("Stopping MiniTezCluster");
+        miniTezCluster.stop();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+    if (dfsCluster != null) {
+      try {
+        LOG.info("Stopping DFSCluster");
+        dfsCluster.shutdown();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  private void startSessionClient() throws Exception {
+    LOG.info("Starting session");
+    tezConf = new TezConfiguration();
+    tezConf.setInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, 0);
+    tezConf
+        .setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
+    tezConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, 4);
+    tezConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 500);
+    tezConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, " -Xmx256m");
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
+    // for local mode
+    tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+    tezConf.set("fs.defaultFS", "file:///");
+    tezConf.setBoolean(
+        TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+
+    tezSession = TezClient.create("TestExceptionPropagation", tezConf);
+    tezSession.start();
+  }
+
+  private void stopSessionClient() {
+    if (tezSession != null) {
+      try {
+        LOG.info("Stopping Tez Session");
+        tezSession.stop();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+    tezSession = null;
+  }
+
+  private void startNonSessionClient() throws Exception {
+    LOG.info("Starting Client");
+    tezConf = new TezConfiguration(miniTezCluster.getConfig());
+    tezConf.setInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, 0);
+    tezConf
+        .setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
+    tezConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, 4);
+    tezConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 500);
+    tezConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, " -Xmx256m");
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, false);
+
+    tezClient = TezClient.create("TestExceptionPropagation", tezConf);
+    tezClient.start();
+  }
+
+  private void stopNonSessionClient() {
+    if (tezClient != null) {
+      try {
+        LOG.info("Stopping Tez Client");
+        tezClient.stop();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+    tezClient = null;
+  }
+
+  /**
+   * verify the diagnostics in DAGStatus is correct in session mode, using local
+   * mode for fast speed
+   * 
+   * @throws Exception
+   * 
+   */
+  @Test(timeout = 120000)
+  public void testExceptionPropagationSession() throws Exception {
+    try {
+      startSessionClient();
+      for (ExceptionLocation exLocation : ExceptionLocation.values()) {
+        LOG.info("Session mode, Test for Exception from:" + exLocation.name());
+        DAG dag = createDAG(exLocation);
+        DAGClient dagClient = tezSession.submitDAG(dag);
+        DAGStatus dagStatus = dagClient.waitForCompletion();
+        String diagnostics = StringUtils.join(dagStatus.getDiagnostics(), ",");
+        LOG.info("Diagnostics:" + diagnostics);
+        assertTrue(diagnostics.contains(exLocation.name()));
+      }
+    } finally {
+      stopSessionClient();
+    }
+  }
+
+  /**
+   * verify the diagnostics in {@link DAGStatus} is correct in non-session mode,
+   * and also verify that diagnostics from {@link DAGStatus} should match that
+   * from {@link ApplicationReport}
+   * 
+   * @throws Exception
+   */
+  @Test(timeout = 120000)
+  public void testExceptionPropagationNonSession() throws Exception {
+    try {
+      startMiniTezCluster();
+      startNonSessionClient();
+
+      ExceptionLocation exLocation = ExceptionLocation.INPUT_START;
+      LOG.info("NonSession mode, Test for Exception from:" + exLocation.name());
+      DAG dag = createDAG(exLocation);
+      DAGClient dagClient = tezClient.submitDAG(dag);
+      DAGStatus dagStatus = dagClient.waitForCompletion();
+      String diagnostics = StringUtils.join(dagStatus.getDiagnostics(), ",");
+      LOG.info("Diagnostics:" + diagnostics);
+      assertTrue(diagnostics.contains(exLocation.name()));
+
+      // wait for app complete (unregisterApplicationMaster is done)
+      ApplicationId appId = tezClient.getAppMasterApplicationId();
+      YarnClient yarnClient = YarnClient.createYarnClient();
+      yarnClient.init(tezConf);
+      yarnClient.start();
+      Set<YarnApplicationState> FINAL_APPLICATION_STATES =
+          EnumSet.of(YarnApplicationState.KILLED, YarnApplicationState.FAILED,
+              YarnApplicationState.FINISHED);
+      ApplicationReport appReport = null;
+      while (true) {
+        appReport = yarnClient.getApplicationReport(appId);
+        Thread.sleep(1000);
+        LOG.info("FinalAppStatus:" + appReport.getFinalApplicationStatus());
+        LOG.info("Diagnostics from appReport:" + appReport.getDiagnostics());
+        if (FINAL_APPLICATION_STATES.contains(appReport
+            .getYarnApplicationState())) {
+          break;
+        }
+      }
+      // wait for 1 second and call getApplicationReport again to ensure get the
+      // diagnostics
+      // TODO remove it after YARN-2560
+      Thread.sleep(1000);
+      appReport = yarnClient.getApplicationReport(appId);
+
+      LOG.info("FinalAppStatus:" + appReport.getFinalApplicationStatus());
+      LOG.info("Diagnostics from appReport:" + appReport.getDiagnostics());
+      assertTrue(appReport.getDiagnostics().contains(exLocation.name()));
+      // use "\n" as separator, because we also use it in Tez internally when
+      // assembling the application diagnostics.
+      assertEquals(StringUtils.join(dagStatus.getDiagnostics(), "\n").trim(),
+          appReport.getDiagnostics().trim());
+    } finally {
+      stopNonSessionClient();
+      Thread.sleep(10*1000);
+      stopTezMiniCluster();
+    }
+  }
+
+  public static enum ExceptionLocation {
+    INPUT_START, INPUT_GET_READER, INPUT_HANDLE_EVENTS, INPUT_CLOSE, INPUT_INITIALIZE, OUTPUT_START, OUTPUT_GET_WRITER,
+    // Not Supported yet
+    // OUTPUT_HANDLE_EVENTS,
+    OUTPUT_CLOSE, OUTPUT_INITIALIZE,
+    // Not Supported yet
+    // PROCESSOR_HANDLE_EVENTS
+    PROCESSOR_RUN, PROCESSOR_CLOSE, PROCESSOR_INITIALIZE,
+
+  }
+
+  /**
+   * create a DAG with single vertex, set payload on Input/Output/Processor to
+   * control where throw exception
+   * 
+   * @param exLocation
+   * @return
+   */
+  private DAG createDAG(ExceptionLocation exLocation) {
+    DAG dag = DAG.create("dag_" + exLocation.name());
+    UserPayload payload =
+        UserPayload.create(ByteBuffer.wrap(exLocation.name().getBytes()));
+    Vertex v1 =
+        Vertex.create("v1", ProcessorWithException.getProcDesc(payload), 1);
+    InputDescriptor inputDesc = InputWithException.getInputDesc(payload);
+    InputInitializerDescriptor iiDesc =
+        InputInitializerWithException.getIIDesc(payload);
+    v1.addDataSource("input",
+        DataSourceDescriptor.create(inputDesc, iiDesc, null));
+    OutputDescriptor outputDesc = OutputWithException.getOutputDesc(payload);
+    v1.addDataSink("output", DataSinkDescriptor.create(outputDesc, null, null));
+    dag.addVertex(v1);
+    return dag;
+  }
+
+  public static class InputInitializerWithException extends InputInitializer {
+
+    private ExceptionLocation exLocation;
+
+    public InputInitializerWithException(
+        InputInitializerContext initializerContext) {
+      super(initializerContext);
+      this.exLocation =
+          ExceptionLocation.valueOf(new String(getContext().getUserPayload()
+              .deepCopyAsArray()));
+    }
+
+    @Override
+    public List<Event> initialize() throws Exception {
+      List<Event> events = new ArrayList<Event>();
+      events.add(InputDataInformationEvent.createWithObjectPayload(0, null));
+      return events;
+    }
+
+    @Override
+    public void handleInputInitializerEvent(List<InputInitializerEvent> events)
+        throws Exception {
+    }
+
+    public static InputInitializerDescriptor getIIDesc(UserPayload payload) {
+      return InputInitializerDescriptor.create(
+          InputInitializerWithException.class.getName())
+          .setUserPayload(payload);
+    }
+  }
+
+  public static class InputWithException extends AbstractLogicalInput {
+
+    private ExceptionLocation exLocation;
+    private Object condition = new Object();
+
+    public InputWithException(InputContext inputContext, int numPhysicalInputs) {
+      super(inputContext, numPhysicalInputs);
+      this.exLocation =
+          ExceptionLocation.valueOf(new String(getContext().getUserPayload()
+              .deepCopyAsArray()));
+    }
+
+    @Override
+    public void start() throws Exception {
+      if (this.exLocation == ExceptionLocation.INPUT_START) {
+        throw new Exception(this.exLocation.name());
+      }
+    }
+
+    @Override
+    public Reader getReader() throws Exception {
+      if (this.exLocation == ExceptionLocation.INPUT_HANDLE_EVENTS) {
+        synchronized (condition) {
+          // wait for exception thrown from handleEvents. Otherwise,
+          // processor may exit before the exception from handleEvents is
+          // caught.
+          condition.wait();
+        }
+      }
+      if (this.exLocation == ExceptionLocation.INPUT_GET_READER) {
+        throw new Exception(this.exLocation.name());
+      }
+      return null;
+    }
+
+    @Override
+    public void handleEvents(List<Event> inputEvents) throws Exception {
+      if (this.exLocation == ExceptionLocation.INPUT_HANDLE_EVENTS) {
+        throw new Exception(this.exLocation.name());
+      }
+    }
+
+    @Override
+    public List<Event> close() throws Exception {
+      if (this.exLocation == ExceptionLocation.INPUT_CLOSE) {
+        throw new Exception(this.exLocation.name());
+      }
+      return null;
+    }
+
+    @Override
+    public List<Event> initialize() throws Exception {
+      getContext().requestInitialMemory(0l, null); // mandatory call
+      if (this.exLocation == ExceptionLocation.INPUT_INITIALIZE) {
+        throw new Exception(this.exLocation.name());
+      }
+      return null;
+    }
+
+    public static InputDescriptor getInputDesc(UserPayload payload) {
+      return InputDescriptor.create(InputWithException.class.getName())
+          .setUserPayload(payload);
+    }
+  }
+
+  public static class OutputWithException extends AbstractLogicalOutput {
+
+    private ExceptionLocation exLocation;
+
+    public OutputWithException(OutputContext outputContext,
+        int numPhysicalOutputs) {
+      super(outputContext, numPhysicalOutputs);
+      this.exLocation =
+          ExceptionLocation.valueOf(new String(getContext().getUserPayload()
+              .deepCopyAsArray()));
+    }
+
+    @Override
+    public void start() throws Exception {
+      if (this.exLocation == ExceptionLocation.OUTPUT_START) {
+        throw new Exception(this.exLocation.name());
+      }
+
+    }
+
+    @Override
+    public Writer getWriter() throws Exception {
+      if (this.exLocation == ExceptionLocation.OUTPUT_GET_WRITER) {
+        throw new Exception(this.exLocation.name());
+      }
+      return null;
+    }
+
+    @Override
+    public void handleEvents(List<Event> outputEvents) {
+    }
+
+    @Override
+    public List<Event> close() throws Exception {
+      if (this.exLocation == ExceptionLocation.OUTPUT_CLOSE) {
+        throw new RuntimeException(this.exLocation.name());
+      }
+      return null;
+    }
+
+    @Override
+    public List<Event> initialize() throws Exception {
+      getContext().requestInitialMemory(0l, null); // mandatory call
+      if (this.exLocation == ExceptionLocation.OUTPUT_INITIALIZE) {
+        throw new RuntimeException(this.exLocation.name());
+      }
+      return null;
+    }
+
+    public static OutputDescriptor getOutputDesc(UserPayload payload) {
+      return OutputDescriptor.create(OutputWithException.class.getName())
+          .setUserPayload(payload);
+    }
+  }
+
+  public static class ProcessorWithException extends AbstractLogicalIOProcessor {
+
+    private ExceptionLocation exLocation;
+
+    public ProcessorWithException(ProcessorContext context) {
+      super(context);
+      this.exLocation =
+          ExceptionLocation.valueOf(new String(getContext().getUserPayload()
+              .deepCopyAsArray()));
+    }
+
+    @Override
+    public void run(Map<String, LogicalInput> inputs,
+        Map<String, LogicalOutput> outputs) throws Exception {
+      InputWithException input = (InputWithException) inputs.get("input");
+      input.start();
+      input.getReader();
+
+      OutputWithException output = (OutputWithException) outputs.get("output");
+      output.start();
+      output.getWriter();
+
+      if (this.exLocation == ExceptionLocation.PROCESSOR_RUN) {
+        throw new Exception(this.exLocation.name());
+      }
+    }
+
+    @Override
+    public void handleEvents(List<Event> processorEvents) {
+    }
+
+    @Override
+    public void close() throws Exception {
+      if (this.exLocation == ExceptionLocation.PROCESSOR_CLOSE) {
+        throw new RuntimeException(this.exLocation.name());
+      }
+    }
+
+    @Override
+    public void initialize() throws Exception {
+      if (this.exLocation == ExceptionLocation.PROCESSOR_INITIALIZE) {
+        throw new RuntimeException(this.exLocation.name());
+      }
+    }
+
+    public static ProcessorDescriptor getProcDesc(UserPayload payload) {
+      return ProcessorDescriptor.create(ProcessorWithException.class.getName())
+          .setUserPayload(payload);
+    }
+  }
+}