You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/06 09:41:18 UTC

[16/50] [abbrv] tez git commit: TEZ-2237. Valid events should be sent out when an Output is not started. (sseth)

TEZ-2237. Valid events should be sent out when an Output is not started. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e762a35f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e762a35f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e762a35f

Branch: refs/heads/TEZ-2003
Commit: e762a35fd81228f85c455b612f7cc8ff6a305e41
Parents: ba6d7e0
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon May 4 16:30:06 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon May 4 16:30:06 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../library/common/shuffle/ShuffleUtils.java    | 55 ++++++++++++++++
 .../output/OrderedPartitionedKVOutput.java      | 16 +++--
 .../library/output/UnorderedKVOutput.java       | 13 +++-
 .../output/UnorderedPartitionedKVOutput.java    | 11 +++-
 .../library/output/OutputTestHelpers.java       | 47 ++++++++++++++
 .../output/TestOrderedPartitionedKVOutput2.java | 67 ++++++++++++++++++++
 .../library/output/TestUnorderedKVOutput2.java  | 60 ++++++++++++++++++
 .../TestUnorderedPartitionedKVOutput2.java      | 62 ++++++++++++++++++
 9 files changed, 325 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/e762a35f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0027e98..5b18258 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -161,6 +161,7 @@ Release 0.6.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2237. Valid events should be sent out when an Output is not started.
   TEZ-1988. Tez UI: does not work when using file:// in a browser
   TEZ-2390. tez-tools swimlane tool fails to parse large jobs >8K containers
   TEZ-2256. Avoid use of BufferTooSmallException to signal end of buffer in UnorderedPartitionedKVWriter

http://git-wip-us.apache.org/repos/asf/tez/blob/e762a35f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index 9a8b6b5..46489ed 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -33,6 +33,7 @@ import javax.crypto.SecretKey;
 
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -333,6 +334,60 @@ public class ShuffleUtils {
   }
 
   /**
+   * Generate events for outputs which have not been started.
+   * @param eventList
+   * @param numPhysicalOutputs
+   * @param context
+   * @param generateVmEvent whether to generate a vm event or not
+   * @param isCompositeEvent whether to generate a CompositeDataMovementEvent or a DataMovementEvent
+   * @throws IOException
+   */
+  public static void generateEventsForNonStartedOutput(List<Event> eventList,
+                                                       int numPhysicalOutputs,
+                                                       OutputContext context,
+                                                       boolean generateVmEvent,
+                                                       boolean isCompositeEvent) throws
+      IOException {
+    DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
+        .newBuilder();
+
+
+    // Construct the VertexManager event if required.
+    if (generateVmEvent) {
+      ShuffleUserPayloads.VertexManagerEventPayloadProto.Builder vmBuilder =
+          ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder();
+      vmBuilder.setOutputSize(0);
+      VertexManagerEvent vmEvent = VertexManagerEvent.create(
+          context.getDestinationVertexName(),
+          vmBuilder.build().toByteString().asReadOnlyByteBuffer());
+      eventList.add(vmEvent);
+    }
+
+    // Construct the DataMovementEvent
+    // Always set empty partition information since no files were generated.
+    LOG.info("Setting all {} partitions as empty for non-started output", numPhysicalOutputs);
+    BitSet emptyPartitionDetails = new BitSet(numPhysicalOutputs);
+    emptyPartitionDetails.set(0, numPhysicalOutputs, true);
+    ByteString emptyPartitionsBytesString =
+        TezCommonUtils.compressByteArrayToByteString(
+            TezUtilsInternal.toByteArray(emptyPartitionDetails));
+    payloadBuilder.setEmptyPartitions(emptyPartitionsBytesString);
+    payloadBuilder.setRunDuration(0);
+    DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
+    ByteBuffer dmePayload = payloadProto.toByteString().asReadOnlyByteBuffer();
+
+
+    if (isCompositeEvent) {
+      CompositeDataMovementEvent cdme =
+          CompositeDataMovementEvent.create(0, numPhysicalOutputs, dmePayload);
+      eventList.add(cdme);
+    } else {
+      DataMovementEvent dme = DataMovementEvent.create(0, dmePayload);
+      eventList.add(dme);
+    }
+  }
+
+  /**
    * Generate events when spill happens
    *
    * @param eventList events would be added to this list

http://git-wip-us.apache.org/repos/asf/tez/blob/e762a35f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index 40edc76..6227fb9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -185,11 +185,13 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
       this.endTime = System.nanoTime();
       returnEvents = generateEvents();
     } else {
-      LOG.warn("Attempting to close output " + getContext().getDestinationVertexName()
-          + " before it was started");
-      returnEvents = Collections.emptyList();
+      LOG.warn(
+          "Attempting to close output {} of type {} before it was started. Generating empty events",
+          getContext().getDestinationVertexName(), this.getClass().getSimpleName());
+      returnEvents = generateEmptyEvents();
     }
-    
+
+    // This works for non-started outputs since new counters will be created with an initial value of 0
     long outputSize = getContext().getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue();
     getContext().getStatisticsReporter().reportDataSize(outputSize);
     long outputRecords = getContext().getCounters()
@@ -210,6 +212,12 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
     return eventList;
   }
 
+  private List<Event> generateEmptyEvents() throws IOException {
+    List<Event> eventList = Lists.newLinkedList();
+    ShuffleUtils.generateEventsForNonStartedOutput(eventList, getNumPhysicalOutputs(), getContext(), true, true);
+    return eventList;
+  }
+
 
   private static final Set<String> confKeys = new HashSet<String>();
 

http://git-wip-us.apache.org/repos/asf/tez/blob/e762a35f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
index 2c26374..08e6ec0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
@@ -20,10 +20,12 @@ package org.apache.tez.runtime.library.output;
 
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -125,9 +127,16 @@ public class UnorderedKVOutput extends AbstractLogicalOutput {
       //TODO: Do we need to support sending payloads via events?
       returnEvents = kvWriter.close();
     } else {
-      returnEvents = Collections.emptyList();
+      LOG.warn(
+          "Attempting to close output {} of type {} before it was started. Generating empty events",
+          getContext().getDestinationVertexName(), this.getClass().getSimpleName());
+      returnEvents = new LinkedList<Event>();
+      ShuffleUtils
+          .generateEventsForNonStartedOutput(returnEvents, getNumPhysicalOutputs(), getContext(),
+              false, false);
     }
-    
+
+    // This works for non-started outputs since new counters will be created with an initial value of 0
     long outputSize = getContext().getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue();
     getContext().getStatisticsReporter().reportDataSize(outputSize);
     long outputRecords = getContext().getCounters()

http://git-wip-us.apache.org/repos/asf/tez/blob/e762a35f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
index 34f2e3e..38450ee 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
@@ -20,12 +20,14 @@ package org.apache.tez.runtime.library.output;
 
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.base.Preconditions;
 
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -102,9 +104,16 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput {
     if (isStarted.get()) {
       returnEvents = kvWriter.close();
     } else {
-      returnEvents = Collections.emptyList();
+      LOG.warn(
+          "Attempting to close output {} of type {} before it was started. Generating empty events",
+          getContext().getDestinationVertexName(), this.getClass().getSimpleName());
+      returnEvents = new LinkedList<Event>();
+      ShuffleUtils
+          .generateEventsForNonStartedOutput(returnEvents, getNumPhysicalOutputs(), getContext(),
+              false, true);
     }
 
+    // This works for non-started outputs since new counters will be created with an initial value of 0
     long outputSize = getContext().getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue();
     getContext().getStatisticsReporter().reportDataSize(outputSize);
     long outputRecords = getContext().getCounters()

http://git-wip-us.apache.org/repos/asf/tez/blob/e762a35f/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java
new file mode 100644
index 0000000..db9a0ed
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.output;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.OutputStatisticsReporter;
+
+public class OutputTestHelpers {
+  static OutputContext createOutputContext() throws IOException {
+    OutputContext outputContext = mock(OutputContext.class);
+    Configuration conf = new TezConfiguration();
+    UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
+    String[] workingDirs = new String[]{"workDir1"};
+    OutputStatisticsReporter statsReporter = mock(OutputStatisticsReporter.class);
+    TezCounters counters = new TezCounters();
+
+    doReturn("destinationVertex").when(outputContext).getDestinationVertexName();
+    doReturn(payLoad).when(outputContext).getUserPayload();
+    doReturn(workingDirs).when(outputContext).getWorkDirs();
+    doReturn(200 * 1024 * 1024l).when(outputContext).getTotalMemoryAvailableToTask();
+    doReturn(counters).when(outputContext).getCounters();
+    doReturn(statsReporter).when(outputContext).getStatisticsReporter();
+    return outputContext;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/e762a35f/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOrderedPartitionedKVOutput2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOrderedPartitionedKVOutput2.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOrderedPartitionedKVOutput2.java
new file mode 100644
index 0000000..8e76a8b
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOrderedPartitionedKVOutput2.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.output;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.List;
+
+import com.google.protobuf.ByteString;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
+import org.junit.Test;
+
+// Tests which don't require parameterization
+public class TestOrderedPartitionedKVOutput2 {
+
+
+  @Test(timeout = 5000)
+  public void testNonStartedOutput() throws IOException {
+    OutputContext outputContext = OutputTestHelpers.createOutputContext();
+    int numPartitions = 10;
+    OrderedPartitionedKVOutput output = new OrderedPartitionedKVOutput(outputContext, numPartitions);
+    output.initialize();
+    List<Event> events = output.close();
+    assertEquals(2, events.size());
+    Event event1 = events.get(0);
+    assertTrue(event1 instanceof VertexManagerEvent);
+    Event event2 = events.get(1);
+    assertTrue(event2 instanceof CompositeDataMovementEvent);
+    CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) event2;
+    ByteBuffer bb = cdme.getUserPayload();
+    ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload =
+        ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(bb));
+    assertTrue(shufflePayload.hasEmptyPartitions());
+
+    byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload
+        .getEmptyPartitions());
+    BitSet emptyPartionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions);
+    assertEquals(numPartitions, emptyPartionsBitSet.cardinality());
+    for (int i = 0 ; i < numPartitions ; i++) {
+      assertTrue(emptyPartionsBitSet.get(i));
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/e762a35f/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedKVOutput2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedKVOutput2.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedKVOutput2.java
new file mode 100644
index 0000000..ecc1241
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedKVOutput2.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.output;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.List;
+
+import com.google.protobuf.ByteString;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
+import org.junit.Test;
+
+// Tests which don't require parameterization
+public class TestUnorderedKVOutput2 {
+
+  @Test(timeout = 5000)
+  public void testNonStartedOutput() throws Exception {
+    OutputContext outputContext = OutputTestHelpers.createOutputContext();
+    int numPartitions = 1;
+    UnorderedKVOutput output = new UnorderedKVOutput(outputContext, numPartitions);
+    output.initialize();
+    List<Event> events = output.close();
+    assertEquals(1, events.size());
+    Event event1 = events.get(0);
+    assertTrue(event1 instanceof DataMovementEvent);
+    DataMovementEvent dme = (DataMovementEvent) event1;
+    ByteBuffer bb = dme.getUserPayload();
+    ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload =
+        ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(bb));
+    assertTrue(shufflePayload.hasEmptyPartitions());
+
+    byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload
+        .getEmptyPartitions());
+    BitSet emptyPartionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions);
+    assertEquals(numPartitions, emptyPartionsBitSet.cardinality());
+    for (int i = 0 ; i < numPartitions ; i++) {
+      assertTrue(emptyPartionsBitSet.get(i));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/e762a35f/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedPartitionedKVOutput2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedPartitionedKVOutput2.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedPartitionedKVOutput2.java
new file mode 100644
index 0000000..eec4bf5
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedPartitionedKVOutput2.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.output;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.List;
+
+import com.google.protobuf.ByteString;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
+import org.junit.Test;
+
+// Tests which don't require parameterization
+public class TestUnorderedPartitionedKVOutput2 {
+
+
+  @Test(timeout = 5000)
+  public void testNonStartedOutput() throws Exception {
+    OutputContext outputContext = OutputTestHelpers.createOutputContext();
+    int numPartitions = 1;
+    UnorderedPartitionedKVOutput output =
+        new UnorderedPartitionedKVOutput(outputContext, numPartitions);
+    output.initialize();
+    List<Event> events = output.close();
+    assertEquals(1, events.size());
+    Event event1 = events.get(0);
+    assertTrue(event1 instanceof CompositeDataMovementEvent);
+    CompositeDataMovementEvent dme = (CompositeDataMovementEvent) event1;
+    ByteBuffer bb = dme.getUserPayload();
+    ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload =
+        ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(bb));
+    assertTrue(shufflePayload.hasEmptyPartitions());
+
+    byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload
+        .getEmptyPartitions());
+    BitSet emptyPartionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions);
+    assertEquals(numPartitions, emptyPartionsBitSet.cardinality());
+    for (int i = 0; i < numPartitions; i++) {
+      assertTrue(emptyPartionsBitSet.get(i));
+    }
+  }
+}