You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/05/31 12:20:37 UTC

[incubator-uniffle] branch master updated: [#908] feat(tez): Write byte array shuffle data to MapOutput (#909)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new b4e109eb [#908] feat(tez): Write byte array shuffle data to MapOutput  (#909)
b4e109eb is described below

commit b4e109eb658e68cb5cc3d2e1c6cc50e448a9feb0
Author: Qing <11...@qq.com>
AuthorDate: Wed May 31 20:20:30 2023 +0800

    [#908] feat(tez): Write byte array shuffle data to MapOutput  (#909)
    
    ### What changes were proposed in this pull request?
    
    Write byte array shuffle data to MapOutput
    
    ### Why are the changes needed?
    
    Fix: # ([908](https://github.com/apache/incubator-uniffle/issues/908))
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    test unit
---
 .../shuffle/orderedgrouped/RssTezBypassWriter.java | 82 ++++++++++++++++++++++
 .../orderedgrouped/RssTezBypassWriterTest.java     | 51 ++++++++++++++
 2 files changed, 133 insertions(+)

diff --git a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezBypassWriter.java b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezBypassWriter.java
new file mode 100644
index 00000000..22887f19
--- /dev/null
+++ b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezBypassWriter.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import com.google.common.primitives.Ints;
+import org.apache.tez.runtime.library.common.shuffle.DiskFetchedInput;
+import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
+import org.apache.tez.runtime.library.common.shuffle.MemoryFetchedInput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.ChecksumUtils;
+
+
+
+// In Tez shuffle, MapOutput encapsulates the logic to fetch map task's output data via http.
+// So, in RSS, we should bypass this logic, and directly write data to MapOutput.
+public class RssTezBypassWriter {
+  private static final Logger LOG = LoggerFactory.getLogger(RssTezBypassWriter.class);
+  private static final byte[] HEADER = new byte[] { (byte) 'T', (byte) 'I', (byte) 'F', (byte) 0};
+
+  public static void write(MapOutput mapOutput, byte[] buffer) {
+    // Write and commit uncompressed data to MapOutput.
+    // In the majority of cases, merger allocates memory to accept data,
+    // but when data size exceeds the threshold, merger can also allocate disk.
+    // So, we should consider the two situations, respectively.
+    if (mapOutput.getType() == MapOutput.Type.MEMORY) {
+      byte[] memory = mapOutput.getMemory();
+      System.arraycopy(buffer, 0, memory, 0, buffer.length);
+    } else if (mapOutput.getType() == MapOutput.Type.DISK) {
+      // RSS leverages its own compression, it is incompatible with hadoop's disk file compression.
+      // So we should disable this situation.
+      throw new RssException("RSS does not support OnDiskMapOutput as shuffle ouput,"
+              + " try to reduce mapreduce.reduce.shuffle.memory.limit.percent");
+    } else {
+      throw new RssException("Merger reserve unknown type of MapOutput: "
+              + mapOutput.getClass().getCanonicalName());
+    }
+  }
+
+
+  public static void write(final FetchedInput mapOutput, byte[] buffer) throws IOException {
+    // Write and commit uncompressed data to MapOutput.
+    // In the majority of cases, merger allocates memory to accept data,
+    // but when data size exceeds the threshold, merger can also allocate disk.
+    // So, we should consider the two situations, respectively.
+    if (mapOutput.getType() == FetchedInput.Type.MEMORY) {
+      byte[] memory = ((MemoryFetchedInput) mapOutput).getBytes();
+      System.arraycopy(buffer, 0, memory, 0, buffer.length);
+    } else if (mapOutput.getType() == FetchedInput.Type.DISK) {
+      OutputStream output = ((DiskFetchedInput) mapOutput).getOutputStream();
+      output.write(HEADER);
+      output.write(buffer);
+      output.write(Ints.toByteArray((int)ChecksumUtils.getCrc32(buffer)));
+      output.flush();
+      output.close();
+    } else {
+      throw new RssException("Merger reserve unknown type of MapOutput: "
+          + mapOutput.getClass().getCanonicalName());
+    }
+  }
+}
diff --git a/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezBypassWriterTest.java b/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezBypassWriterTest.java
new file mode 100644
index 00000000..8046bd8f
--- /dev/null
+++ b/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezBypassWriterTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import com.google.common.primitives.Ints;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.util.ChecksumUtils;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RssTezBypassWriterTest {
+  @Test
+  public void testWrite() {
+    byte[] data = new byte[]{1, 2, -1, 1, 2, -1, -1};;
+    MapOutput mapOutput = MapOutput.createMemoryMapOutput(null, null, 7, true);
+    RssTezBypassWriter.write(mapOutput, data);
+    byte[] r = mapOutput.getMemory();
+    assertTrue(Arrays.equals(data, r));
+
+    mapOutput = MapOutput.createMemoryMapOutput(null, null, 8, true);
+    r = mapOutput.getMemory();
+    assertFalse(Arrays.equals(data, r));
+  }
+
+  @Test
+  public void testCalcChecksum() throws IOException {
+    byte[] data = new byte[]{1, 2, -1, 1, 2, -1, -1};
+    byte[] result = new byte[]{-71, -87, 19, -71};
+    assertTrue(Arrays.equals(Ints.toByteArray((int)ChecksumUtils.getCrc32(data)), result));
+  }
+}