You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/05/31 12:20:37 UTC
[incubator-uniffle] branch master updated: [#908] feat(tez): Write byte array shuffle data to MapOutput (#909)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new b4e109eb [#908] feat(tez): Write byte array shuffle data to MapOutput (#909)
b4e109eb is described below
commit b4e109eb658e68cb5cc3d2e1c6cc50e448a9feb0
Author: Qing <11...@qq.com>
AuthorDate: Wed May 31 20:20:30 2023 +0800
[#908] feat(tez): Write byte array shuffle data to MapOutput (#909)
### What changes were proposed in this pull request?
Write byte array shuffle data to MapOutput
### Why are the changes needed?
Fix: # ([908](https://github.com/apache/incubator-uniffle/issues/908))
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
test unit
---
.../shuffle/orderedgrouped/RssTezBypassWriter.java | 82 ++++++++++++++++++++++
.../orderedgrouped/RssTezBypassWriterTest.java | 51 ++++++++++++++
2 files changed, 133 insertions(+)
diff --git a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezBypassWriter.java b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezBypassWriter.java
new file mode 100644
index 00000000..22887f19
--- /dev/null
+++ b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezBypassWriter.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import com.google.common.primitives.Ints;
+import org.apache.tez.runtime.library.common.shuffle.DiskFetchedInput;
+import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
+import org.apache.tez.runtime.library.common.shuffle.MemoryFetchedInput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.ChecksumUtils;
+
+
+
+// In Tez shuffle, MapOutput encapsulates the logic to fetch map task's output data via http.
+// So, in RSS, we should bypass this logic, and directly write data to MapOutput.
+public class RssTezBypassWriter {
+ private static final Logger LOG = LoggerFactory.getLogger(RssTezBypassWriter.class);
+ private static final byte[] HEADER = new byte[] { (byte) 'T', (byte) 'I', (byte) 'F', (byte) 0};
+
+ public static void write(MapOutput mapOutput, byte[] buffer) {
+ // Write and commit uncompressed data to MapOutput.
+ // In the majority of cases, merger allocates memory to accept data,
+ // but when data size exceeds the threshold, merger can also allocate disk.
+ // So, we should consider the two situations, respectively.
+ if (mapOutput.getType() == MapOutput.Type.MEMORY) {
+ byte[] memory = mapOutput.getMemory();
+ System.arraycopy(buffer, 0, memory, 0, buffer.length);
+ } else if (mapOutput.getType() == MapOutput.Type.DISK) {
+ // RSS leverages its own compression, it is incompatible with hadoop's disk file compression.
+ // So we should disable this situation.
+ throw new RssException("RSS does not support OnDiskMapOutput as shuffle ouput,"
+ + " try to reduce mapreduce.reduce.shuffle.memory.limit.percent");
+ } else {
+ throw new RssException("Merger reserve unknown type of MapOutput: "
+ + mapOutput.getClass().getCanonicalName());
+ }
+ }
+
+
+ public static void write(final FetchedInput mapOutput, byte[] buffer) throws IOException {
+ // Write and commit uncompressed data to MapOutput.
+ // In the majority of cases, merger allocates memory to accept data,
+ // but when data size exceeds the threshold, merger can also allocate disk.
+ // So, we should consider the two situations, respectively.
+ if (mapOutput.getType() == FetchedInput.Type.MEMORY) {
+ byte[] memory = ((MemoryFetchedInput) mapOutput).getBytes();
+ System.arraycopy(buffer, 0, memory, 0, buffer.length);
+ } else if (mapOutput.getType() == FetchedInput.Type.DISK) {
+ OutputStream output = ((DiskFetchedInput) mapOutput).getOutputStream();
+ output.write(HEADER);
+ output.write(buffer);
+ output.write(Ints.toByteArray((int)ChecksumUtils.getCrc32(buffer)));
+ output.flush();
+ output.close();
+ } else {
+ throw new RssException("Merger reserve unknown type of MapOutput: "
+ + mapOutput.getClass().getCanonicalName());
+ }
+ }
+}
diff --git a/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezBypassWriterTest.java b/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezBypassWriterTest.java
new file mode 100644
index 00000000..8046bd8f
--- /dev/null
+++ b/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezBypassWriterTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import com.google.common.primitives.Ints;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.util.ChecksumUtils;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RssTezBypassWriterTest {
+ @Test
+ public void testWrite() {
+ byte[] data = new byte[]{1, 2, -1, 1, 2, -1, -1};;
+ MapOutput mapOutput = MapOutput.createMemoryMapOutput(null, null, 7, true);
+ RssTezBypassWriter.write(mapOutput, data);
+ byte[] r = mapOutput.getMemory();
+ assertTrue(Arrays.equals(data, r));
+
+ mapOutput = MapOutput.createMemoryMapOutput(null, null, 8, true);
+ r = mapOutput.getMemory();
+ assertFalse(Arrays.equals(data, r));
+ }
+
+ @Test
+ public void testCalcChecksum() throws IOException {
+ byte[] data = new byte[]{1, 2, -1, 1, 2, -1, -1};
+ byte[] result = new byte[]{-71, -87, 19, -71};
+ assertTrue(Arrays.equals(Ints.toByteArray((int)ChecksumUtils.getCrc32(data)), result));
+ }
+}