You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/06/12 09:35:07 UTC
[incubator-uniffle] branch master updated: [#855] feat(tez): Support Tez Output UnorderedPartitionedKVOutput (#943)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 2dbf281f [#855] feat(tez): Support Tez Output UnorderedPartitionedKVOutput (#943)
2dbf281f is described below
commit 2dbf281fb30deb29e6a9dd5a5e6d0758a31651a4
Author: bin41215 <74...@users.noreply.github.com>
AuthorDate: Mon Jun 12 17:35:02 2023 +0800
[#855] feat(tez): Support Tez Output UnorderedPartitionedKVOutput (#943)
### What changes were proposed in this pull request?
support tez write UnorderedPartitionedKVOutput
### Why are the changes needed?
Fix: #855
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT
Co-authored-by: bin3.zhang <bi...@huolala.cn>
---
.../library/common/sort/impl/RssUnSorter.java | 196 ++++++++++++++
.../output/RssUnorderedPartitionedKVOutput.java | 293 +++++++++++++++++++++
.../library/common/sort/impl/RssUnSorterTest.java | 104 ++++++++
.../RssUnorderedPartitionedKVOutputTest.java | 104 ++++++++
4 files changed, 697 insertions(+)
diff --git a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java
new file mode 100644
index 00000000..601cb240
--- /dev/null
+++ b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.sort.impl;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.common.IdUtils;
+import org.apache.tez.common.RssTezConfig;
+import org.apache.tez.common.RssTezUtils;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.library.common.sort.buffer.WriteBufferManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.ByteUnit;
+import org.apache.uniffle.storage.util.StorageType;
+
+/**{@link RssUnSorter} is an {@link ExternalSorter}
+ */
+public class RssUnSorter extends ExternalSorter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RssUnSorter.class);
+ private WriteBufferManager bufferManager;
+ private Set<Long> successBlockIds = Sets.newConcurrentHashSet();
+ private Set<Long> failedBlockIds = Sets.newConcurrentHashSet();
+ private Map<Integer, List<ShuffleServerInfo>> partitionToServers;
+ private int[] numRecordsPerPartition;
+
+ /**
+ * Initialization
+ */
+ public RssUnSorter(TezTaskAttemptID tezTaskAttemptID, OutputContext outputContext, Configuration conf,
+ int numMaps, int numOutputs, long initialMemoryAvailable, int shuffleId,
+ Map<Integer, List<ShuffleServerInfo>> partitionToServers) throws IOException {
+ super(outputContext, conf, numOutputs, initialMemoryAvailable);
+ this.partitionToServers = partitionToServers;
+
+ this.numRecordsPerPartition = new int[numOutputs];
+
+ long sortmb = conf.getLong(RssTezConfig.RSS_RUNTIME_IO_SORT_MB, RssTezConfig.RSS_DEFAULT_RUNTIME_IO_SORT_MB);
+ LOG.info("conf.sortmb is {}", sortmb);
+ sortmb = this.availableMemoryMb;
+ LOG.info("sortmb, availableMemoryMb is {}, {}", sortmb, availableMemoryMb);
+ if ((sortmb & 0x7FF) != sortmb) {
+ throw new IOException(
+ "Invalid \"" + RssTezConfig.RSS_RUNTIME_IO_SORT_MB + "\": " + sortmb);
+ }
+ double sortThreshold = conf.getDouble(RssTezConfig.RSS_CLIENT_SORT_MEMORY_USE_THRESHOLD,
+ RssTezConfig.RSS_CLIENT_DEFAULT_SORT_MEMORY_USE_THRESHOLD);
+ long taskAttemptId = RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptID, IdUtils.getAppAttemptId());
+ long maxSegmentSize = conf.getLong(RssTezConfig.RSS_CLIENT_MAX_BUFFER_SIZE,
+ RssTezConfig.RSS_CLIENT_DEFAULT_MAX_BUFFER_SIZE);
+ long maxBufferSize = conf.getLong(RssTezConfig.RSS_WRITER_BUFFER_SIZE, RssTezConfig.RSS_DEFAULT_WRITER_BUFFER_SIZE);
+ double memoryThreshold = conf.getDouble(RssTezConfig.RSS_CLIENT_MEMORY_THRESHOLD,
+ RssTezConfig.RSS_CLIENT_DEFAULT_MEMORY_THRESHOLD);
+ double sendThreshold = conf.getDouble(RssTezConfig.RSS_CLIENT_SEND_THRESHOLD,
+ RssTezConfig.RSS_CLIENT_DEFAULT_SEND_THRESHOLD);
+ int batch = conf.getInt(RssTezConfig.RSS_CLIENT_BATCH_TRIGGER_NUM,
+ RssTezConfig.RSS_CLIENT_DEFAULT_BATCH_TRIGGER_NUM);
+ String storageType = conf.get(RssTezConfig.RSS_STORAGE_TYPE, RssTezConfig.RSS_DEFAULT_STORAGE_TYPE);
+ if (StringUtils.isEmpty(storageType)) {
+ throw new RssException("storage type mustn't be empty");
+ }
+ long sendCheckInterval = conf.getLong(RssTezConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS,
+ RssTezConfig.RSS_CLIENT_DEFAULT_SEND_CHECK_INTERVAL_MS);
+ long sendCheckTimeout = conf.getLong(RssTezConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS,
+ RssTezConfig.RSS_CLIENT_DEFAULT_SEND_CHECK_TIMEOUT_MS);
+ int bitmapSplitNum = conf.getInt(RssTezConfig.RSS_CLIENT_BITMAP_NUM, RssTezConfig.RSS_CLIENT_DEFAULT_BITMAP_NUM);
+
+ if (conf.get(RssTezConfig.HIVE_TEZ_LOG_LEVEL, RssTezConfig.DEFAULT_HIVE_TEZ_LOG_LEVEL)
+ .equalsIgnoreCase(RssTezConfig.DEBUG_HIVE_TEZ_LOG_LEVEL)) {
+ LOG.info("sortmb is {}", sortmb);
+ LOG.info("sortThreshold is {}", sortThreshold);
+ LOG.info("taskAttemptId is {}", taskAttemptId);
+ LOG.info("maxSegmentSize is {}", maxSegmentSize);
+ LOG.info("maxBufferSize is {}", maxBufferSize);
+ LOG.info("memoryThreshold is {}", memoryThreshold);
+ LOG.info("sendThreshold is {}", sendThreshold);
+ LOG.info("batch is {}", batch);
+ LOG.info("storageType is {}", storageType);
+ LOG.info("sendCheckInterval is {}", sendCheckInterval);
+ LOG.info("sendCheckTimeout is {}", sendCheckTimeout);
+ LOG.info("bitmapSplitNum is {}", bitmapSplitNum);
+ }
+
+ String containerIdStr =
+ System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name());
+ ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+ ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
+ LOG.info("containerIdStr is {}", containerIdStr);
+ LOG.info("containerId is {}", containerId);
+ LOG.info("applicationAttemptId is {}", applicationAttemptId.toString());
+
+ bufferManager = new WriteBufferManager(
+ tezTaskAttemptID,
+ (long)(ByteUnit.MiB.toBytes(sortmb) * sortThreshold),
+ applicationAttemptId.toString(),
+ taskAttemptId,
+ successBlockIds,
+ failedBlockIds,
+ RssTezUtils.createShuffleClient(conf),
+ comparator,
+ maxSegmentSize,
+ keySerializer,
+ valSerializer,
+ maxBufferSize,
+ memoryThreshold,
+ sendThreshold,
+ batch,
+ new RssConf(),
+ partitionToServers,
+ numMaps,
+ isMemoryShuffleEnabled(storageType),
+ sendCheckInterval,
+ sendCheckTimeout,
+ bitmapSplitNum,
+ shuffleId,
+ false);
+ LOG.info("Initialized WriteBufferManager.");
+ }
+
+ @Override
+ public void flush() throws IOException {
+ bufferManager.waitSendFinished();
+ }
+
+ @Override
+ public final void close() throws IOException {
+ super.close();
+ bufferManager.freeAllResources();
+ }
+
+ @Override
+ public void write(Object key, Object value) throws IOException {
+ try {
+ collect(key, value, partitioner.getPartition(key, value, partitions));
+ } catch (InterruptedException e) {
+ throw new RssException(e);
+ }
+ }
+
+ synchronized void collect(Object key, Object value, final int partition) throws IOException, InterruptedException {
+ if (key.getClass() != keyClass) {
+ throw new IOException("Type mismatch in key from map: expected "
+ + keyClass.getName() + ", received "
+ + key.getClass().getName());
+ }
+ if (value.getClass() != valClass) {
+ throw new IOException("Type mismatch in value from map: expected "
+ + valClass.getName() + ", received "
+ + value.getClass().getName());
+ }
+ if (partition < 0 || partition >= partitions) {
+ throw new IOException("Illegal partition for " + key + " (" + partition + ")");
+ }
+ bufferManager.addRecord(partition, key, value);
+ numRecordsPerPartition[partition]++;
+ }
+
+ public int[] getNumRecordsPerPartition() {
+ return numRecordsPerPartition;
+ }
+
+ private boolean isMemoryShuffleEnabled(String storageType) {
+ return StorageType.withMemory(StorageType.valueOf(storageType));
+ }
+}
diff --git a/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutput.java b/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutput.java
new file mode 100644
index 00000000..d0fb0788
--- /dev/null
+++ b/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutput.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.output;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.zip.Deflater;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.GetShuffleServerRequest;
+import org.apache.tez.common.GetShuffleServerResponse;
+import org.apache.tez.common.RssTezConfig;
+import org.apache.tez.common.RssTezUtils;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezRemoteShuffleUmbilicalProtocol;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.AbstractLogicalOutput;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.Writer;
+import org.apache.tez.runtime.library.api.KeyValuesWriter;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter;
+import org.apache.tez.runtime.library.common.sort.impl.RssTezPerPartitionRecord;
+import org.apache.tez.runtime.library.common.sort.impl.RssUnSorter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.ShuffleServerInfo;
+
+
+/**
+ * {@link RssUnorderedPartitionedKVOutput} is an {@link AbstractLogicalOutput} which
+ * support remote shuffle.
+ *
+ */
+@Public
+public class RssUnorderedPartitionedKVOutput extends AbstractLogicalOutput {
+ private static final Logger LOG = LoggerFactory.getLogger(RssUnorderedPartitionedKVOutput.class);
+ protected ExternalSorter sorter;
+
+ protected Configuration conf;
+ protected MemoryUpdateCallbackHandler memoryUpdateCallbackHandler;
+ private long startTime;
+ private long endTime;
+ private final AtomicBoolean isStarted = new AtomicBoolean(false);
+ private final Deflater deflater;
+ private Map<Integer, List<ShuffleServerInfo>> partitionToServers;
+ private int mapNum;
+ private int numOutputs;
+ private TezTaskAttemptID taskAttemptId;
+ private ApplicationId applicationId;
+ private boolean sendEmptyPartitionDetails;
+ private OutputContext outputContext;
+ private String host;
+ private int port;
+ private String taskVertexName;
+ private String destinationVertexName;
+ private int shuffleId;
+
+ public RssUnorderedPartitionedKVOutput(OutputContext outputContext, int numPhysicalOutputs) {
+ super(outputContext, numPhysicalOutputs);
+ this.outputContext = outputContext;
+ this.deflater = TezCommonUtils.newBestCompressionDeflater();
+ this.numOutputs = getNumPhysicalOutputs();
+ this.mapNum = outputContext.getVertexParallelism();
+ this.applicationId = outputContext.getApplicationId();
+ this.taskAttemptId = TezTaskAttemptID.fromString(
+ RssTezUtils.uniqueIdentifierToAttemptId(outputContext.getUniqueIdentifier()));
+ this.taskVertexName = outputContext.getTaskVertexName();
+ this.destinationVertexName = outputContext.getDestinationVertexName();
+ LOG.info("taskAttemptId is {}", taskAttemptId.toString());
+ LOG.info("taskVertexName is {}", taskVertexName);
+ LOG.info("destinationVertexName is {}", destinationVertexName);
+ LOG.info("Initialized RssUnOrderedPartitionedKVOutput.");
+ }
+
+ private void getRssConf() {
+ try {
+ JobConf conf = new JobConf(RssTezConfig.RSS_CONF_FILE);
+ this.host = conf.get(RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS, "null host");
+ this.port = conf.getInt(RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT, -1);
+
+ LOG.info("Got RssConf am info : host is {}, port is {}", host, port);
+ } catch (Exception e) {
+ LOG.warn("debugRssConf error: ", e);
+ }
+ }
+
+ @Override
+ public List<Event> initialize() throws Exception {
+ this.startTime = System.nanoTime();
+ this.conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+ this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();
+
+ long memRequestSize = RssTezUtils.getInitialMemoryRequirement(conf, getContext().getTotalMemoryAvailableToTask());
+ LOG.info("memRequestSize is {}", memRequestSize);
+ getContext().requestInitialMemory(memRequestSize, memoryUpdateCallbackHandler);
+ LOG.info("Got initialMemory.");
+
+ getRssConf();
+
+ this.sendEmptyPartitionDetails = conf.getBoolean(
+ TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
+ TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT);
+
+
+ final InetSocketAddress address = NetUtils.createSocketAddrForHost(host, port);
+
+ UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(this.applicationId.toString());
+ TezRemoteShuffleUmbilicalProtocol umbilical = taskOwner
+ .doAs(new PrivilegedExceptionAction<TezRemoteShuffleUmbilicalProtocol>() {
+ @Override
+ public TezRemoteShuffleUmbilicalProtocol run() throws Exception {
+ return RPC.getProxy(TezRemoteShuffleUmbilicalProtocol.class,
+ TezRemoteShuffleUmbilicalProtocol.versionID,
+ address,
+ conf);
+ }
+ });
+ TezVertexID tezVertexID = taskAttemptId.getTaskID().getVertexID();
+ TezDAGID tezDAGID = tezVertexID.getDAGId();
+ this.shuffleId = RssTezUtils.computeShuffleId(tezDAGID.getId(), this.taskVertexName, this.destinationVertexName);
+ GetShuffleServerRequest request = new GetShuffleServerRequest(this.taskAttemptId, this.mapNum,
+ this.numOutputs, this.shuffleId);
+ GetShuffleServerResponse response = umbilical.getShuffleAssignments(request);
+
+ this.partitionToServers = response.getShuffleAssignmentsInfoWritable()
+ .getShuffleAssignmentsInfo().getPartitionToServers();
+
+ LOG.info("Got response from am.");
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void handleEvents(List<Event> list) {
+
+ }
+
+ @Override
+ public List<Event> close() throws Exception {
+ List<Event> returnEvents = Lists.newLinkedList();
+ if (sorter != null) {
+ sorter.flush();
+ sorter.close();
+ this.endTime = System.nanoTime();
+ returnEvents.addAll(generateEvents());
+ sorter = null;
+ } else {
+ LOG.warn(getContext().getDestinationVertexName()
+ + ": Attempting to close output {} of type {} before it was started. "
+ + "Generating empty events",
+ getContext().getDestinationVertexName(), this.getClass().getSimpleName());
+ returnEvents = generateEmptyEvents();
+ }
+ LOG.info("RssUnorderedPartitionedKVOutput close.");
+ return returnEvents;
+ }
+
+ @Override
+ public void start() throws Exception {
+ if (!isStarted.get()) {
+ memoryUpdateCallbackHandler.validateUpdateReceived();
+ sorter = new RssUnSorter(taskAttemptId, getContext(), conf, mapNum, numOutputs,
+ memoryUpdateCallbackHandler.getMemoryAssigned(), shuffleId,
+ partitionToServers);
+ LOG.info("Initialized RssUnSorter.");
+ isStarted.set(true);
+ }
+ }
+
+ @Override
+ public Writer getWriter() throws IOException {
+ Preconditions.checkState(isStarted.get(), "Cannot get writer before starting the Output");
+ return new KeyValuesWriter() {
+ @Override
+ public void write(Object key, Iterable<Object> values) throws IOException {
+ sorter.write(key, values);
+ }
+
+ @Override
+ public void write(Object key, Object value) throws IOException {
+ sorter.write(key, value);
+ }
+ };
+ }
+
+ private List<Event> generateEvents() throws IOException {
+ List<Event> eventList = Lists.newLinkedList();
+ boolean isLastEvent = true;
+
+ String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+ TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+
+ int[] numRecordsPerPartition = ((RssUnSorter) sorter).getNumRecordsPerPartition();
+
+ RssTezPerPartitionRecord rssTezSpillRecord = new RssTezPerPartitionRecord(numOutputs, numRecordsPerPartition);
+
+ LOG.info("RssTezPerPartitionRecord is initialized");
+
+ ShuffleUtils.generateEventOnSpill(eventList, true, isLastEvent,
+ getContext(), 0, rssTezSpillRecord,
+ getNumPhysicalOutputs(), sendEmptyPartitionDetails, getContext().getUniqueIdentifier(),
+ sorter.getPartitionStats(), sorter.reportDetailedPartitionStats(), auxiliaryService, deflater);
+ LOG.info("Generate events.");
+ return eventList;
+ }
+
+ private List<Event> generateEmptyEvents() throws IOException {
+ List<Event> eventList = Lists.newArrayList();
+ ShuffleUtils.generateEventsForNonStartedOutput(eventList,
+ getNumPhysicalOutputs(),
+ getContext(),
+ true,
+ true,
+ deflater);
+ LOG.info("Generate empty events.");
+ return eventList;
+ }
+
+ private static final Set<String> confKeys = new HashSet<String>();
+
+ static {
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT);
+ confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX);
+ confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH);
+ confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);
+ confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS);
+ confKeys.add(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID);
+ confKeys.add(
+ TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT);
+ }
+
+ @InterfaceAudience.Private
+ public static Set<String> getConfigurationKeySet() {
+ return Collections.unmodifiableSet(confKeys);
+ }
+
+}
+
diff --git a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorterTest.java b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorterTest.java
new file mode 100644
index 00000000..af1353f2
--- /dev/null
+++ b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorterTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.sort.impl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.output.OutputTestHelpers;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.ShuffleServerInfo;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RssUnSorterTest {
+ private static Map<Integer, List<ShuffleServerInfo>> partitionToServers = new HashMap<>();
+ private Configuration conf;
+ private FileSystem localFs;
+ private Path workingDir;
+
+ /**
+ * set up
+ */
+ @BeforeEach
+ public void setup() throws Exception {
+ conf = new Configuration();
+ localFs = FileSystem.getLocal(conf);
+ workingDir = new Path(System.getProperty("test.build.data",
+ System.getProperty("java.io.tmpdir", "/tmp")),
+ RssSorterTest.class.getName()).makeQualified(
+ localFs.getUri(), localFs.getWorkingDirectory());
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS,
+ HashPartitioner.class.getName());
+ conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, workingDir.toString());
+
+ Map<String, String> envMap = System.getenv();
+ Map<String, String> env = new HashMap<>();
+ env.putAll(envMap);
+ env.put(ApplicationConstants.Environment.CONTAINER_ID.name(), "container_e160_1681717153064_3770270_01_000001");
+
+ RssSorterTest.setEnv(env);
+ }
+
+ @Test
+ public void testCollectAndRecordsPerPartition() throws IOException, InterruptedException {
+ TezTaskAttemptID tezTaskAttemptID =
+ TezTaskAttemptID.fromString("attempt_1681717153064_3770270_1_00_000000_0");
+
+ OutputContext outputContext = OutputTestHelpers.createOutputContext(conf, workingDir);
+
+ long initialMemoryAvailable = 10240000;
+ int shuffleId = 1001;
+
+ RssUnSorter rssSorter = new RssUnSorter(tezTaskAttemptID, outputContext, conf, 5, 5,
+ initialMemoryAvailable, shuffleId, partitionToServers);
+
+ rssSorter.collect(new Text("0"), new Text("0"), 0);
+ rssSorter.collect(new Text("0"), new Text("1"), 0);
+ rssSorter.collect(new Text("1"), new Text("1"), 1);
+ rssSorter.collect(new Text("2"), new Text("2"), 2);
+ rssSorter.collect(new Text("3"), new Text("3"), 3);
+ rssSorter.collect(new Text("4"), new Text("4"), 4);
+
+ assertTrue(2 == rssSorter.getNumRecordsPerPartition()[0]);
+ assertTrue(1 == rssSorter.getNumRecordsPerPartition()[1]);
+ assertTrue(1 == rssSorter.getNumRecordsPerPartition()[2]);
+ assertTrue(1 == rssSorter.getNumRecordsPerPartition()[3]);
+ assertTrue(1 == rssSorter.getNumRecordsPerPartition()[4]);
+
+ assertTrue(5 == rssSorter.getNumRecordsPerPartition().length);
+ }
+
+
+}
diff --git a/client-tez/src/test/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutputTest.java b/client-tez/src/test/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutputTest.java
new file mode 100644
index 00000000..8a73a143
--- /dev/null
+++ b/client-tez/src/test/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutputTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.output;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import org.apache.uniffle.common.ShuffleServerInfo;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RssUnorderedPartitionedKVOutputTest {
+ private static Map<Integer, List<ShuffleServerInfo>> partitionToServers = new HashMap<>();
+ private Configuration conf;
+ private FileSystem localFs;
+ private Path workingDir;
+
+ @BeforeEach
+ public void setup() throws IOException {
+ conf = new Configuration();
+ localFs = FileSystem.getLocal(conf);
+ workingDir = new Path(System.getProperty("test.build.data",
+ System.getProperty("java.io.tmpdir", "/tmp")),
+ RssOrderedPartitionedKVOutputTest.class.getName()).makeQualified(
+ localFs.getUri(), localFs.getWorkingDirectory());
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS,
+ HashPartitioner.class.getName());
+ conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, workingDir.toString());
+ }
+
+ @AfterEach
+ public void cleanup() throws IOException {
+ localFs.delete(workingDir, true);
+ }
+
+ @Test
+ @Timeout(value = 3000, unit = TimeUnit.MILLISECONDS)
+ public void testNonStartedOutput() throws Exception {
+ OutputContext outputContext = OutputTestHelpers.createOutputContext(conf, workingDir);
+ int numPartitions = 10;
+ RssUnorderedPartitionedKVOutput output = new RssUnorderedPartitionedKVOutput(outputContext, numPartitions);
+ List<Event> events = output.close();
+ assertEquals(2, events.size());
+ Event event1 = events.get(0);
+ assertTrue(event1 instanceof VertexManagerEvent);
+ Event event2 = events.get(1);
+ assertTrue(event2 instanceof CompositeDataMovementEvent);
+ CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) event2;
+ ByteBuffer bb = cdme.getUserPayload();
+ ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload =
+ ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(bb));
+ assertTrue(shufflePayload.hasEmptyPartitions());
+ byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload
+ .getEmptyPartitions());
+ BitSet emptyPartionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions);
+ assertEquals(numPartitions, emptyPartionsBitSet.cardinality());
+ for (int i = 0; i < numPartitions; i++) {
+ assertTrue(emptyPartionsBitSet.get(i));
+ }
+ }
+}