You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/05 01:30:30 UTC
tez git commit: TEZ-2237. Valid events should be sent out when an
Output is not started. (sseth)
Repository: tez
Updated Branches:
refs/heads/master ba6d7e0e9 -> e762a35fd
TEZ-2237. Valid events should be sent out when an Output is not started. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e762a35f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e762a35f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e762a35f
Branch: refs/heads/master
Commit: e762a35fd81228f85c455b612f7cc8ff6a305e41
Parents: ba6d7e0
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon May 4 16:30:06 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon May 4 16:30:06 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../library/common/shuffle/ShuffleUtils.java | 55 ++++++++++++++++
.../output/OrderedPartitionedKVOutput.java | 16 +++--
.../library/output/UnorderedKVOutput.java | 13 +++-
.../output/UnorderedPartitionedKVOutput.java | 11 +++-
.../library/output/OutputTestHelpers.java | 47 ++++++++++++++
.../output/TestOrderedPartitionedKVOutput2.java | 67 ++++++++++++++++++++
.../library/output/TestUnorderedKVOutput2.java | 60 ++++++++++++++++++
.../TestUnorderedPartitionedKVOutput2.java | 62 ++++++++++++++++++
9 files changed, 325 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/e762a35f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0027e98..5b18258 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -161,6 +161,7 @@ Release 0.6.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2237. Valid events should be sent out when an Output is not started.
TEZ-1988. Tez UI: does not work when using file:// in a browser
TEZ-2390. tez-tools swimlane tool fails to parse large jobs >8K containers
TEZ-2256. Avoid use of BufferTooSmallException to signal end of buffer in UnorderedPartitionedKVWriter
http://git-wip-us.apache.org/repos/asf/tez/blob/e762a35f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index 9a8b6b5..46489ed 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -33,6 +33,7 @@ import javax.crypto.SecretKey;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -333,6 +334,60 @@ public class ShuffleUtils {
}
/**
+ * Generate events for outputs which have not been started.
+ * @param eventList
+ * @param numPhysicalOutputs
+ * @param context
+ * @param generateVmEvent whether to generate a vm event or not
+ * @param isCompositeEvent whether to generate a CompositeDataMovementEvent or a DataMovementEvent
+ * @throws IOException
+ */
+ public static void generateEventsForNonStartedOutput(List<Event> eventList,
+ int numPhysicalOutputs,
+ OutputContext context,
+ boolean generateVmEvent,
+ boolean isCompositeEvent) throws
+ IOException {
+ DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
+ .newBuilder();
+
+
+ // Construct the VertexManager event if required.
+ if (generateVmEvent) {
+ ShuffleUserPayloads.VertexManagerEventPayloadProto.Builder vmBuilder =
+ ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder();
+ vmBuilder.setOutputSize(0);
+ VertexManagerEvent vmEvent = VertexManagerEvent.create(
+ context.getDestinationVertexName(),
+ vmBuilder.build().toByteString().asReadOnlyByteBuffer());
+ eventList.add(vmEvent);
+ }
+
+ // Construct the DataMovementEvent
+ // Always set empty partition information since no files were generated.
+ LOG.info("Setting all {} partitions as empty for non-started output", numPhysicalOutputs);
+ BitSet emptyPartitionDetails = new BitSet(numPhysicalOutputs);
+ emptyPartitionDetails.set(0, numPhysicalOutputs, true);
+ ByteString emptyPartitionsBytesString =
+ TezCommonUtils.compressByteArrayToByteString(
+ TezUtilsInternal.toByteArray(emptyPartitionDetails));
+ payloadBuilder.setEmptyPartitions(emptyPartitionsBytesString);
+ payloadBuilder.setRunDuration(0);
+ DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
+ ByteBuffer dmePayload = payloadProto.toByteString().asReadOnlyByteBuffer();
+
+
+ if (isCompositeEvent) {
+ CompositeDataMovementEvent cdme =
+ CompositeDataMovementEvent.create(0, numPhysicalOutputs, dmePayload);
+ eventList.add(cdme);
+ } else {
+ DataMovementEvent dme = DataMovementEvent.create(0, dmePayload);
+ eventList.add(dme);
+ }
+ }
+
+ /**
* Generate events when spill happens
*
* @param eventList events would be added to this list
http://git-wip-us.apache.org/repos/asf/tez/blob/e762a35f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index 40edc76..6227fb9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -185,11 +185,13 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
this.endTime = System.nanoTime();
returnEvents = generateEvents();
} else {
- LOG.warn("Attempting to close output " + getContext().getDestinationVertexName()
- + " before it was started");
- returnEvents = Collections.emptyList();
+ LOG.warn(
+ "Attempting to close output {} of type {} before it was started. Generating empty events",
+ getContext().getDestinationVertexName(), this.getClass().getSimpleName());
+ returnEvents = generateEmptyEvents();
}
-
+
+ // This works for non-started outputs since new counters will be created with an initial value of 0
long outputSize = getContext().getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue();
getContext().getStatisticsReporter().reportDataSize(outputSize);
long outputRecords = getContext().getCounters()
@@ -210,6 +212,12 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
return eventList;
}
+ private List<Event> generateEmptyEvents() throws IOException {
+ List<Event> eventList = Lists.newLinkedList();
+ ShuffleUtils.generateEventsForNonStartedOutput(eventList, getNumPhysicalOutputs(), getContext(), true, true);
+ return eventList;
+ }
+
private static final Set<String> confKeys = new HashSet<String>();
http://git-wip-us.apache.org/repos/asf/tez/blob/e762a35f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
index 2c26374..08e6ec0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
@@ -20,10 +20,12 @@ package org.apache.tez.runtime.library.output;
import java.util.Collections;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -125,9 +127,16 @@ public class UnorderedKVOutput extends AbstractLogicalOutput {
//TODO: Do we need to support sending payloads via events?
returnEvents = kvWriter.close();
} else {
- returnEvents = Collections.emptyList();
+ LOG.warn(
+ "Attempting to close output {} of type {} before it was started. Generating empty events",
+ getContext().getDestinationVertexName(), this.getClass().getSimpleName());
+ returnEvents = new LinkedList<Event>();
+ ShuffleUtils
+ .generateEventsForNonStartedOutput(returnEvents, getNumPhysicalOutputs(), getContext(),
+ false, false);
}
-
+
+ // This works for non-started outputs since new counters will be created with an initial value of 0
long outputSize = getContext().getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue();
getContext().getStatisticsReporter().reportDataSize(outputSize);
long outputRecords = getContext().getCounters()
http://git-wip-us.apache.org/repos/asf/tez/blob/e762a35f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
index 34f2e3e..38450ee 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
@@ -20,12 +20,14 @@ package org.apache.tez.runtime.library.output;
import java.util.Collections;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.base.Preconditions;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -102,9 +104,16 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput {
if (isStarted.get()) {
returnEvents = kvWriter.close();
} else {
- returnEvents = Collections.emptyList();
+ LOG.warn(
+ "Attempting to close output {} of type {} before it was started. Generating empty events",
+ getContext().getDestinationVertexName(), this.getClass().getSimpleName());
+ returnEvents = new LinkedList<Event>();
+ ShuffleUtils
+ .generateEventsForNonStartedOutput(returnEvents, getNumPhysicalOutputs(), getContext(),
+ false, true);
}
+ // This works for non-started outputs since new counters will be created with an initial value of 0
long outputSize = getContext().getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue();
getContext().getStatisticsReporter().reportDataSize(outputSize);
long outputRecords = getContext().getCounters()
http://git-wip-us.apache.org/repos/asf/tez/blob/e762a35f/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java
new file mode 100644
index 0000000..db9a0ed
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.output;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.OutputStatisticsReporter;
+
+public class OutputTestHelpers {
+ static OutputContext createOutputContext() throws IOException {
+ OutputContext outputContext = mock(OutputContext.class);
+ Configuration conf = new TezConfiguration();
+ UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
+ String[] workingDirs = new String[]{"workDir1"};
+ OutputStatisticsReporter statsReporter = mock(OutputStatisticsReporter.class);
+ TezCounters counters = new TezCounters();
+
+ doReturn("destinationVertex").when(outputContext).getDestinationVertexName();
+ doReturn(payLoad).when(outputContext).getUserPayload();
+ doReturn(workingDirs).when(outputContext).getWorkDirs();
+ doReturn(200 * 1024 * 1024l).when(outputContext).getTotalMemoryAvailableToTask();
+ doReturn(counters).when(outputContext).getCounters();
+ doReturn(statsReporter).when(outputContext).getStatisticsReporter();
+ return outputContext;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/e762a35f/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOrderedPartitionedKVOutput2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOrderedPartitionedKVOutput2.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOrderedPartitionedKVOutput2.java
new file mode 100644
index 0000000..8e76a8b
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOrderedPartitionedKVOutput2.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.output;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.List;
+
+import com.google.protobuf.ByteString;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
+import org.junit.Test;
+
+// Tests which don't require parameterization
+public class TestOrderedPartitionedKVOutput2 {
+
+
+ @Test(timeout = 5000)
+ public void testNonStartedOutput() throws IOException {
+ OutputContext outputContext = OutputTestHelpers.createOutputContext();
+ int numPartitions = 10;
+ OrderedPartitionedKVOutput output = new OrderedPartitionedKVOutput(outputContext, numPartitions);
+ output.initialize();
+ List<Event> events = output.close();
+ assertEquals(2, events.size());
+ Event event1 = events.get(0);
+ assertTrue(event1 instanceof VertexManagerEvent);
+ Event event2 = events.get(1);
+ assertTrue(event2 instanceof CompositeDataMovementEvent);
+ CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) event2;
+ ByteBuffer bb = cdme.getUserPayload();
+ ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload =
+ ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(bb));
+ assertTrue(shufflePayload.hasEmptyPartitions());
+
+ byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload
+ .getEmptyPartitions());
+ BitSet emptyPartionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions);
+ assertEquals(numPartitions, emptyPartionsBitSet.cardinality());
+ for (int i = 0 ; i < numPartitions ; i++) {
+ assertTrue(emptyPartionsBitSet.get(i));
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/e762a35f/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedKVOutput2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedKVOutput2.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedKVOutput2.java
new file mode 100644
index 0000000..ecc1241
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedKVOutput2.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.output;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.List;
+
+import com.google.protobuf.ByteString;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
+import org.junit.Test;
+
+// Tests which don't require parameterization
+public class TestUnorderedKVOutput2 {
+
+ @Test(timeout = 5000)
+ public void testNonStartedOutput() throws Exception {
+ OutputContext outputContext = OutputTestHelpers.createOutputContext();
+ int numPartitions = 1;
+ UnorderedKVOutput output = new UnorderedKVOutput(outputContext, numPartitions);
+ output.initialize();
+ List<Event> events = output.close();
+ assertEquals(1, events.size());
+ Event event1 = events.get(0);
+ assertTrue(event1 instanceof DataMovementEvent);
+ DataMovementEvent dme = (DataMovementEvent) event1;
+ ByteBuffer bb = dme.getUserPayload();
+ ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload =
+ ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(bb));
+ assertTrue(shufflePayload.hasEmptyPartitions());
+
+ byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload
+ .getEmptyPartitions());
+ BitSet emptyPartionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions);
+ assertEquals(numPartitions, emptyPartionsBitSet.cardinality());
+ for (int i = 0 ; i < numPartitions ; i++) {
+ assertTrue(emptyPartionsBitSet.get(i));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/e762a35f/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedPartitionedKVOutput2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedPartitionedKVOutput2.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedPartitionedKVOutput2.java
new file mode 100644
index 0000000..eec4bf5
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestUnorderedPartitionedKVOutput2.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.output;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.List;
+
+import com.google.protobuf.ByteString;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
+import org.junit.Test;
+
+// Tests which don't require parameterization
+public class TestUnorderedPartitionedKVOutput2 {
+
+
+ @Test(timeout = 5000)
+ public void testNonStartedOutput() throws Exception {
+ OutputContext outputContext = OutputTestHelpers.createOutputContext();
+ int numPartitions = 1;
+ UnorderedPartitionedKVOutput output =
+ new UnorderedPartitionedKVOutput(outputContext, numPartitions);
+ output.initialize();
+ List<Event> events = output.close();
+ assertEquals(1, events.size());
+ Event event1 = events.get(0);
+ assertTrue(event1 instanceof CompositeDataMovementEvent);
+ CompositeDataMovementEvent dme = (CompositeDataMovementEvent) event1;
+ ByteBuffer bb = dme.getUserPayload();
+ ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload =
+ ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(bb));
+ assertTrue(shufflePayload.hasEmptyPartitions());
+
+ byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload
+ .getEmptyPartitions());
+ BitSet emptyPartionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions);
+ assertEquals(numPartitions, emptyPartionsBitSet.cardinality());
+ for (int i = 0; i < numPartitions; i++) {
+ assertTrue(emptyPartionsBitSet.get(i));
+ }
+ }
+}