You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/06/12 09:35:07 UTC

[incubator-uniffle] branch master updated: [#855] feat(tez): Support Tez Output UnorderedPartitionedKVOutput (#943)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 2dbf281f [#855] feat(tez): Support Tez Output UnorderedPartitionedKVOutput (#943)
2dbf281f is described below

commit 2dbf281fb30deb29e6a9dd5a5e6d0758a31651a4
Author: bin41215 <74...@users.noreply.github.com>
AuthorDate: Mon Jun 12 17:35:02 2023 +0800

    [#855] feat(tez): Support Tez Output UnorderedPartitionedKVOutput (#943)
    
    ### What changes were proposed in this pull request?
    
    support tez write UnorderedPartitionedKVOutput
    
    ### Why are the changes needed?
    
    Fix: #855
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    UT
    
    Co-authored-by: bin3.zhang <bi...@huolala.cn>
---
 .../library/common/sort/impl/RssUnSorter.java      | 196 ++++++++++++++
 .../output/RssUnorderedPartitionedKVOutput.java    | 293 +++++++++++++++++++++
 .../library/common/sort/impl/RssUnSorterTest.java  | 104 ++++++++
 .../RssUnorderedPartitionedKVOutputTest.java       | 104 ++++++++
 4 files changed, 697 insertions(+)

diff --git a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java
new file mode 100644
index 00000000..601cb240
--- /dev/null
+++ b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.sort.impl;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.common.IdUtils;
+import org.apache.tez.common.RssTezConfig;
+import org.apache.tez.common.RssTezUtils;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.library.common.sort.buffer.WriteBufferManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.ByteUnit;
+import org.apache.uniffle.storage.util.StorageType;
+
+/**{@link RssUnSorter} is an {@link ExternalSorter}
+ */
+public class RssUnSorter extends ExternalSorter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RssUnSorter.class);
+  private WriteBufferManager bufferManager;
+  private Set<Long> successBlockIds = Sets.newConcurrentHashSet();
+  private Set<Long> failedBlockIds = Sets.newConcurrentHashSet();
+  private Map<Integer, List<ShuffleServerInfo>> partitionToServers;
+  private int[] numRecordsPerPartition;
+
+  /**
+   * Initialization
+   */
+  public RssUnSorter(TezTaskAttemptID tezTaskAttemptID, OutputContext outputContext, Configuration conf,
+        int numMaps, int numOutputs, long initialMemoryAvailable, int shuffleId,
+        Map<Integer, List<ShuffleServerInfo>> partitionToServers) throws IOException {
+    super(outputContext, conf, numOutputs, initialMemoryAvailable);
+    this.partitionToServers = partitionToServers;
+
+    this.numRecordsPerPartition = new int[numOutputs];
+
+    long sortmb = conf.getLong(RssTezConfig.RSS_RUNTIME_IO_SORT_MB, RssTezConfig.RSS_DEFAULT_RUNTIME_IO_SORT_MB);
+    LOG.info("conf.sortmb is {}", sortmb);
+    sortmb = this.availableMemoryMb;
+    LOG.info("sortmb, availableMemoryMb is {}, {}", sortmb, availableMemoryMb);
+    if ((sortmb & 0x7FF) != sortmb) {
+      throw new IOException(
+        "Invalid \"" + RssTezConfig.RSS_RUNTIME_IO_SORT_MB + "\": " + sortmb);
+    }
+    double sortThreshold = conf.getDouble(RssTezConfig.RSS_CLIENT_SORT_MEMORY_USE_THRESHOLD,
+        RssTezConfig.RSS_CLIENT_DEFAULT_SORT_MEMORY_USE_THRESHOLD);
+    long taskAttemptId = RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptID, IdUtils.getAppAttemptId());
+    long maxSegmentSize = conf.getLong(RssTezConfig.RSS_CLIENT_MAX_BUFFER_SIZE,
+        RssTezConfig.RSS_CLIENT_DEFAULT_MAX_BUFFER_SIZE);
+    long maxBufferSize = conf.getLong(RssTezConfig.RSS_WRITER_BUFFER_SIZE, RssTezConfig.RSS_DEFAULT_WRITER_BUFFER_SIZE);
+    double memoryThreshold = conf.getDouble(RssTezConfig.RSS_CLIENT_MEMORY_THRESHOLD,
+        RssTezConfig.RSS_CLIENT_DEFAULT_MEMORY_THRESHOLD);
+    double sendThreshold = conf.getDouble(RssTezConfig.RSS_CLIENT_SEND_THRESHOLD,
+        RssTezConfig.RSS_CLIENT_DEFAULT_SEND_THRESHOLD);
+    int batch = conf.getInt(RssTezConfig.RSS_CLIENT_BATCH_TRIGGER_NUM,
+        RssTezConfig.RSS_CLIENT_DEFAULT_BATCH_TRIGGER_NUM);
+    String storageType = conf.get(RssTezConfig.RSS_STORAGE_TYPE, RssTezConfig.RSS_DEFAULT_STORAGE_TYPE);
+    if (StringUtils.isEmpty(storageType)) {
+      throw new RssException("storage type mustn't be empty");
+    }
+    long sendCheckInterval = conf.getLong(RssTezConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS,
+        RssTezConfig.RSS_CLIENT_DEFAULT_SEND_CHECK_INTERVAL_MS);
+    long sendCheckTimeout = conf.getLong(RssTezConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS,
+        RssTezConfig.RSS_CLIENT_DEFAULT_SEND_CHECK_TIMEOUT_MS);
+    int bitmapSplitNum = conf.getInt(RssTezConfig.RSS_CLIENT_BITMAP_NUM, RssTezConfig.RSS_CLIENT_DEFAULT_BITMAP_NUM);
+
+    if (conf.get(RssTezConfig.HIVE_TEZ_LOG_LEVEL, RssTezConfig.DEFAULT_HIVE_TEZ_LOG_LEVEL)
+        .equalsIgnoreCase(RssTezConfig.DEBUG_HIVE_TEZ_LOG_LEVEL)) {
+      LOG.info("sortmb is {}", sortmb);
+      LOG.info("sortThreshold is {}", sortThreshold);
+      LOG.info("taskAttemptId is {}", taskAttemptId);
+      LOG.info("maxSegmentSize is {}", maxSegmentSize);
+      LOG.info("maxBufferSize is {}", maxBufferSize);
+      LOG.info("memoryThreshold is {}", memoryThreshold);
+      LOG.info("sendThreshold is {}", sendThreshold);
+      LOG.info("batch is {}", batch);
+      LOG.info("storageType is {}", storageType);
+      LOG.info("sendCheckInterval is {}", sendCheckInterval);
+      LOG.info("sendCheckTimeout is {}", sendCheckTimeout);
+      LOG.info("bitmapSplitNum is {}", bitmapSplitNum);
+    }
+
+    String containerIdStr =
+        System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name());
+    ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+    ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
+    LOG.info("containerIdStr is {}", containerIdStr);
+    LOG.info("containerId is {}", containerId);
+    LOG.info("applicationAttemptId is {}", applicationAttemptId.toString());
+
+    bufferManager = new WriteBufferManager(
+        tezTaskAttemptID,
+        (long)(ByteUnit.MiB.toBytes(sortmb) * sortThreshold),
+        applicationAttemptId.toString(),
+        taskAttemptId,
+        successBlockIds,
+        failedBlockIds,
+        RssTezUtils.createShuffleClient(conf),
+        comparator,
+        maxSegmentSize,
+        keySerializer,
+        valSerializer,
+        maxBufferSize,
+        memoryThreshold,
+        sendThreshold,
+        batch,
+        new RssConf(),
+        partitionToServers,
+        numMaps,
+        isMemoryShuffleEnabled(storageType),
+        sendCheckInterval,
+        sendCheckTimeout,
+        bitmapSplitNum,
+        shuffleId,
+        false);
+    LOG.info("Initialized WriteBufferManager.");
+  }
+
+  @Override
+  public void flush() throws IOException {
+    bufferManager.waitSendFinished();
+  }
+
+  @Override
+  public final void close() throws IOException {
+    super.close();
+    bufferManager.freeAllResources();
+  }
+
+  @Override
+  public void write(Object key, Object value) throws IOException {
+    try {
+      collect(key, value, partitioner.getPartition(key, value, partitions));
+    } catch (InterruptedException e) {
+      throw new RssException(e);
+    }
+  }
+
+  synchronized void collect(Object key, Object value, final int partition) throws IOException, InterruptedException {
+    if (key.getClass() != keyClass) {
+      throw new IOException("Type mismatch in key from map: expected "
+        + keyClass.getName() + ", received "
+        + key.getClass().getName());
+    }
+    if (value.getClass() != valClass) {
+      throw new IOException("Type mismatch in value from map: expected "
+        + valClass.getName() + ", received "
+        + value.getClass().getName());
+    }
+    if (partition < 0 || partition >= partitions) {
+      throw new IOException("Illegal partition for " + key + " (" + partition + ")");
+    }
+    bufferManager.addRecord(partition, key, value);
+    numRecordsPerPartition[partition]++;
+  }
+
+  public int[] getNumRecordsPerPartition() {
+    return numRecordsPerPartition;
+  }
+
+  private boolean isMemoryShuffleEnabled(String storageType) {
+    return StorageType.withMemory(StorageType.valueOf(storageType));
+  }
+}
diff --git a/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutput.java b/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutput.java
new file mode 100644
index 00000000..d0fb0788
--- /dev/null
+++ b/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutput.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.output;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.zip.Deflater;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.GetShuffleServerRequest;
+import org.apache.tez.common.GetShuffleServerResponse;
+import org.apache.tez.common.RssTezConfig;
+import org.apache.tez.common.RssTezUtils;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezRemoteShuffleUmbilicalProtocol;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.AbstractLogicalOutput;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.Writer;
+import org.apache.tez.runtime.library.api.KeyValuesWriter;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter;
+import org.apache.tez.runtime.library.common.sort.impl.RssTezPerPartitionRecord;
+import org.apache.tez.runtime.library.common.sort.impl.RssUnSorter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.ShuffleServerInfo;
+
+
+/**
+ * {@link RssUnorderedPartitionedKVOutput} is an {@link AbstractLogicalOutput} which
+ * support remote shuffle.
+ *
+ */
+@Public
+public class RssUnorderedPartitionedKVOutput extends AbstractLogicalOutput {
+  private static final Logger LOG = LoggerFactory.getLogger(RssUnorderedPartitionedKVOutput.class);
+  protected ExternalSorter sorter;
+
+  protected Configuration conf;
+  protected MemoryUpdateCallbackHandler memoryUpdateCallbackHandler;
+  private long startTime;
+  private long endTime;
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  private final Deflater deflater;
+  private Map<Integer, List<ShuffleServerInfo>> partitionToServers;
+  private int mapNum;
+  private int numOutputs;
+  private TezTaskAttemptID taskAttemptId;
+  private ApplicationId applicationId;
+  private boolean sendEmptyPartitionDetails;
+  private OutputContext outputContext;
+  private String host;
+  private int port;
+  private String taskVertexName;
+  private String destinationVertexName;
+  private int shuffleId;
+
+  public RssUnorderedPartitionedKVOutput(OutputContext outputContext, int numPhysicalOutputs) {
+    super(outputContext, numPhysicalOutputs);
+    this.outputContext = outputContext;
+    this.deflater = TezCommonUtils.newBestCompressionDeflater();
+    this.numOutputs = getNumPhysicalOutputs();
+    this.mapNum = outputContext.getVertexParallelism();
+    this.applicationId = outputContext.getApplicationId();
+    this.taskAttemptId = TezTaskAttemptID.fromString(
+      RssTezUtils.uniqueIdentifierToAttemptId(outputContext.getUniqueIdentifier()));
+    this.taskVertexName = outputContext.getTaskVertexName();
+    this.destinationVertexName = outputContext.getDestinationVertexName();
+    LOG.info("taskAttemptId is {}", taskAttemptId.toString());
+    LOG.info("taskVertexName is {}", taskVertexName);
+    LOG.info("destinationVertexName is {}", destinationVertexName);
+    LOG.info("Initialized RssUnOrderedPartitionedKVOutput.");
+  }
+
+  private void getRssConf() {
+    try {
+      JobConf conf = new JobConf(RssTezConfig.RSS_CONF_FILE);
+      this.host = conf.get(RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS, "null host");
+      this.port = conf.getInt(RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT, -1);
+
+      LOG.info("Got RssConf am info : host is {}, port is {}", host, port);
+    } catch (Exception e) {
+      LOG.warn("debugRssConf error: ", e);
+    }
+  }
+
+  @Override
+  public List<Event> initialize() throws Exception {
+    this.startTime = System.nanoTime();
+    this.conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+    this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();
+
+    long memRequestSize = RssTezUtils.getInitialMemoryRequirement(conf, getContext().getTotalMemoryAvailableToTask());
+    LOG.info("memRequestSize is {}", memRequestSize);
+    getContext().requestInitialMemory(memRequestSize, memoryUpdateCallbackHandler);
+    LOG.info("Got initialMemory.");
+
+    getRssConf();
+
+    this.sendEmptyPartitionDetails = conf.getBoolean(
+      TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
+      TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT);
+
+
+    final InetSocketAddress address = NetUtils.createSocketAddrForHost(host, port);
+
+    UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(this.applicationId.toString());
+    TezRemoteShuffleUmbilicalProtocol umbilical = taskOwner
+        .doAs(new PrivilegedExceptionAction<TezRemoteShuffleUmbilicalProtocol>() {
+          @Override
+          public TezRemoteShuffleUmbilicalProtocol run() throws Exception {
+            return RPC.getProxy(TezRemoteShuffleUmbilicalProtocol.class,
+              TezRemoteShuffleUmbilicalProtocol.versionID,
+              address,
+              conf);
+          }
+        });
+    TezVertexID tezVertexID = taskAttemptId.getTaskID().getVertexID();
+    TezDAGID tezDAGID = tezVertexID.getDAGId();
+    this.shuffleId = RssTezUtils.computeShuffleId(tezDAGID.getId(), this.taskVertexName, this.destinationVertexName);
+    GetShuffleServerRequest request = new GetShuffleServerRequest(this.taskAttemptId, this.mapNum,
+        this.numOutputs, this.shuffleId);
+    GetShuffleServerResponse response = umbilical.getShuffleAssignments(request);
+
+    this.partitionToServers = response.getShuffleAssignmentsInfoWritable()
+        .getShuffleAssignmentsInfo().getPartitionToServers();
+
+    LOG.info("Got response from am.");
+    return Collections.emptyList();
+  }
+
+  @Override
+  public void handleEvents(List<Event> list) {
+
+  }
+
+  @Override
+  public List<Event> close() throws Exception {
+    List<Event> returnEvents = Lists.newLinkedList();
+    if (sorter != null) {
+      sorter.flush();
+      sorter.close();
+      this.endTime = System.nanoTime();
+      returnEvents.addAll(generateEvents());
+      sorter = null;
+    } else {
+      LOG.warn(getContext().getDestinationVertexName()
+          + ": Attempting to close output {} of type {} before it was started. "
+          + "Generating empty events",
+          getContext().getDestinationVertexName(), this.getClass().getSimpleName());
+      returnEvents = generateEmptyEvents();
+    }
+    LOG.info("RssUnorderedPartitionedKVOutput close.");
+    return returnEvents;
+  }
+
+  @Override
+  public void start() throws Exception {
+    if (!isStarted.get()) {
+      memoryUpdateCallbackHandler.validateUpdateReceived();
+      sorter = new RssUnSorter(taskAttemptId, getContext(), conf, mapNum, numOutputs,
+          memoryUpdateCallbackHandler.getMemoryAssigned(), shuffleId,
+          partitionToServers);
+      LOG.info("Initialized RssUnSorter.");
+      isStarted.set(true);
+    }
+  }
+
+  @Override
+  public Writer getWriter() throws IOException {
+    Preconditions.checkState(isStarted.get(), "Cannot get writer before starting the Output");
+    return new KeyValuesWriter() {
+      @Override
+      public void write(Object key, Iterable<Object> values) throws IOException {
+        sorter.write(key, values);
+      }
+
+      @Override
+      public void write(Object key, Object value) throws IOException {
+        sorter.write(key, value);
+      }
+    };
+  }
+
+  private List<Event> generateEvents() throws IOException {
+    List<Event> eventList = Lists.newLinkedList();
+    boolean isLastEvent = true;
+
+    String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+
+    int[] numRecordsPerPartition = ((RssUnSorter) sorter).getNumRecordsPerPartition();
+
+    RssTezPerPartitionRecord rssTezSpillRecord = new RssTezPerPartitionRecord(numOutputs, numRecordsPerPartition);
+
+    LOG.info("RssTezPerPartitionRecord is initialized");
+
+    ShuffleUtils.generateEventOnSpill(eventList, true, isLastEvent,
+        getContext(), 0, rssTezSpillRecord,
+        getNumPhysicalOutputs(), sendEmptyPartitionDetails, getContext().getUniqueIdentifier(),
+        sorter.getPartitionStats(), sorter.reportDetailedPartitionStats(), auxiliaryService, deflater);
+    LOG.info("Generate events.");
+    return eventList;
+  }
+
+  private List<Event> generateEmptyEvents() throws IOException {
+    List<Event> eventList = Lists.newArrayList();
+    ShuffleUtils.generateEventsForNonStartedOutput(eventList,
+        getNumPhysicalOutputs(),
+        getContext(),
+        true,
+        true,
+        deflater);
+    LOG.info("Generate empty events.");
+    return eventList;
+  }
+
+  private static final Set<String> confKeys = new HashSet<String>();
+
+  static {
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT);
+    confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX);
+    confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH);
+    confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);
+    confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS);
+    confKeys.add(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID);
+    confKeys.add(
+        TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT);
+  }
+
+  @InterfaceAudience.Private
+  public static Set<String> getConfigurationKeySet() {
+    return Collections.unmodifiableSet(confKeys);
+  }
+
+}
+
diff --git a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorterTest.java b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorterTest.java
new file mode 100644
index 00000000..af1353f2
--- /dev/null
+++ b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorterTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.sort.impl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.output.OutputTestHelpers;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.ShuffleServerInfo;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RssUnSorterTest {
+  private static Map<Integer, List<ShuffleServerInfo>> partitionToServers = new HashMap<>();
+  private Configuration conf;
+  private FileSystem localFs;
+  private Path workingDir;
+
+  /**
+   * set up
+   */
+  @BeforeEach
+  public void setup() throws Exception {
+    conf = new Configuration();
+    localFs = FileSystem.getLocal(conf);
+    workingDir = new Path(System.getProperty("test.build.data",
+        System.getProperty("java.io.tmpdir", "/tmp")),
+        RssSorterTest.class.getName()).makeQualified(
+        localFs.getUri(), localFs.getWorkingDirectory());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS,
+        HashPartitioner.class.getName());
+    conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, workingDir.toString());
+
+    Map<String, String> envMap = System.getenv();
+    Map<String, String> env = new HashMap<>();
+    env.putAll(envMap);
+    env.put(ApplicationConstants.Environment.CONTAINER_ID.name(), "container_e160_1681717153064_3770270_01_000001");
+
+    RssSorterTest.setEnv(env);
+  }
+
+  @Test
+  public void testCollectAndRecordsPerPartition() throws IOException, InterruptedException {
+    TezTaskAttemptID tezTaskAttemptID =
+        TezTaskAttemptID.fromString("attempt_1681717153064_3770270_1_00_000000_0");
+
+    OutputContext outputContext = OutputTestHelpers.createOutputContext(conf, workingDir);
+
+    long initialMemoryAvailable = 10240000;
+    int shuffleId = 1001;
+
+    RssUnSorter rssSorter = new RssUnSorter(tezTaskAttemptID, outputContext, conf, 5, 5,
+        initialMemoryAvailable, shuffleId, partitionToServers);
+
+    rssSorter.collect(new Text("0"), new Text("0"), 0);
+    rssSorter.collect(new Text("0"), new Text("1"), 0);
+    rssSorter.collect(new Text("1"), new Text("1"), 1);
+    rssSorter.collect(new Text("2"), new Text("2"), 2);
+    rssSorter.collect(new Text("3"), new Text("3"), 3);
+    rssSorter.collect(new Text("4"), new Text("4"), 4);
+
+    assertTrue(2 == rssSorter.getNumRecordsPerPartition()[0]);
+    assertTrue(1 == rssSorter.getNumRecordsPerPartition()[1]);
+    assertTrue(1 == rssSorter.getNumRecordsPerPartition()[2]);
+    assertTrue(1 == rssSorter.getNumRecordsPerPartition()[3]);
+    assertTrue(1 == rssSorter.getNumRecordsPerPartition()[4]);
+
+    assertTrue(5 == rssSorter.getNumRecordsPerPartition().length);
+  }
+
+
+}
diff --git a/client-tez/src/test/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutputTest.java b/client-tez/src/test/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutputTest.java
new file mode 100644
index 00000000..8a73a143
--- /dev/null
+++ b/client-tez/src/test/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutputTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.output;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import org.apache.uniffle.common.ShuffleServerInfo;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RssUnorderedPartitionedKVOutputTest {
+  private static Map<Integer, List<ShuffleServerInfo>> partitionToServers = new HashMap<>();
+  private Configuration conf;
+  private FileSystem localFs;
+  private Path workingDir;
+
+  @BeforeEach
+  public void setup() throws IOException {
+    conf = new Configuration();
+    localFs = FileSystem.getLocal(conf);
+    workingDir = new Path(System.getProperty("test.build.data",
+        System.getProperty("java.io.tmpdir", "/tmp")),
+        RssOrderedPartitionedKVOutputTest.class.getName()).makeQualified(
+        localFs.getUri(), localFs.getWorkingDirectory());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS,
+        HashPartitioner.class.getName());
+    conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, workingDir.toString());
+  }
+
+  @AfterEach
+  public void cleanup() throws IOException {
+    localFs.delete(workingDir, true);
+  }
+
+  @Test
+  @Timeout(value = 3000, unit = TimeUnit.MILLISECONDS)
+  public void testNonStartedOutput() throws Exception {
+    OutputContext outputContext = OutputTestHelpers.createOutputContext(conf, workingDir);
+    int numPartitions = 10;
+    RssUnorderedPartitionedKVOutput output = new RssUnorderedPartitionedKVOutput(outputContext, numPartitions);
+    List<Event> events = output.close();
+    assertEquals(2, events.size());
+    Event event1 = events.get(0);
+    assertTrue(event1 instanceof VertexManagerEvent);
+    Event event2 = events.get(1);
+    assertTrue(event2 instanceof CompositeDataMovementEvent);
+    CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) event2;
+    ByteBuffer bb = cdme.getUserPayload();
+    ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload =
+        ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(bb));
+    assertTrue(shufflePayload.hasEmptyPartitions());
+    byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload
+        .getEmptyPartitions());
+    BitSet emptyPartionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions);
+    assertEquals(numPartitions, emptyPartionsBitSet.cardinality());
+    for (int i = 0; i < numPartitions; i++) {
+      assertTrue(emptyPartionsBitSet.get(i));
+    }
+  }
+}