You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/02/17 09:59:31 UTC

[21/50] [abbrv] ignite git commit: IGNITE-4598: Hadoop: implemented raw comparator for BytesWritable key type. This closes #1457.

IGNITE-4598: Hadoop: implemented raw comparator for BytesWritable key type. This closes #1457.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3db0971d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3db0971d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3db0971d

Branch: refs/heads/ignite-1.9
Commit: 3db0971d7d32798aeb3ce5dd8c0b3246a895fe91
Parents: 7f0af43
Author: devozerov <vo...@gridgain.com>
Authored: Tue Jan 24 16:45:59 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Jan 24 16:54:21 2017 +0300

----------------------------------------------------------------------
 .../io/BytesWritablePartiallyRawComparator.java | 51 +++++++++++++++
 .../hadoop/io/TextPartiallyRawComparator.java   | 68 +-------------------
 .../processors/hadoop/impl/HadoopUtils.java     | 66 +++++++++++++++++++
 .../hadoop/impl/v2/HadoopV2TaskContext.java     | 13 +++-
 4 files changed, 129 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3db0971d/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/BytesWritablePartiallyRawComparator.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/BytesWritablePartiallyRawComparator.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/BytesWritablePartiallyRawComparator.java
new file mode 100644
index 0000000..da9240b
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/BytesWritablePartiallyRawComparator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.io;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils;
+import org.apache.ignite.internal.processors.hadoop.io.OffheapRawMemory;
+import org.apache.ignite.internal.processors.hadoop.io.PartiallyOffheapRawComparatorEx;
+
+/**
+ * Partial raw comparator for {@link BytesWritable} data type.
+ * <p>
+ * Implementation is borrowed from {@code org.apache.hadoop.io.FastByteComparisons} and adopted to Ignite
+ * infrastructure.
+ */
+public class BytesWritablePartiallyRawComparator implements PartiallyRawComparator<BytesWritable>,
+    PartiallyOffheapRawComparatorEx<BytesWritable> {
+    /** Length bytes. */
+    private static final int LEN_BYTES = 4;
+
+    /** {@inheritDoc} */
+    @Override public int compare(BytesWritable val1, RawMemory val2Buf) {
+        if (val2Buf instanceof OffheapRawMemory) {
+            OffheapRawMemory val2Buf0 = (OffheapRawMemory)val2Buf;
+
+            return compare(val1, val2Buf0.pointer(), val2Buf0.length());
+        }
+        else
+            throw new UnsupportedOperationException("Text can be compared only with offheap memory.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public int compare(BytesWritable val1, long val2Ptr, int val2Len) {
+        return HadoopUtils.compareBytes(val1.getBytes(), val1.getLength(), val2Ptr + LEN_BYTES, val2Len - LEN_BYTES);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3db0971d/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/TextPartiallyRawComparator.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/TextPartiallyRawComparator.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/TextPartiallyRawComparator.java
index a2bc3d4..e82f5e4 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/TextPartiallyRawComparator.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/TextPartiallyRawComparator.java
@@ -17,10 +17,9 @@
 
 package org.apache.ignite.hadoop.io;
 
-import com.google.common.primitives.Longs;
-import com.google.common.primitives.UnsignedBytes;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils;
 import org.apache.ignite.internal.processors.hadoop.io.OffheapRawMemory;
 import org.apache.ignite.internal.processors.hadoop.io.PartiallyOffheapRawComparatorEx;
 import org.apache.ignite.internal.util.GridUnsafe;
@@ -47,69 +46,6 @@ public class TextPartiallyRawComparator implements PartiallyRawComparator<Text>,
     @Override public int compare(Text val1, long val2Ptr, int val2Len) {
         int len2 = WritableUtils.decodeVIntSize(GridUnsafe.getByte(val2Ptr));
 
-        return compareBytes(val1.getBytes(), val1.getLength(), val2Ptr + len2, val2Len - len2);
-    }
-
-    /**
-     * Internal comparison routine.
-     *
-     * @param buf1 Bytes 1.
-     * @param len1 Length 1.
-     * @param ptr2 Pointer 2.
-     * @param len2 Length 2.
-     * @return Result.
-     */
-    @SuppressWarnings("SuspiciousNameCombination")
-    private static int compareBytes(byte[] buf1, int len1, long ptr2, int len2) {
-        int minLength = Math.min(len1, len2);
-
-        int minWords = minLength / Longs.BYTES;
-
-        for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) {
-            long lw = GridUnsafe.getLong(buf1, GridUnsafe.BYTE_ARR_OFF + i);
-            long rw = GridUnsafe.getLong(ptr2 + i);
-
-            long diff = lw ^ rw;
-
-            if (diff != 0) {
-                if (GridUnsafe.BIG_ENDIAN)
-                    return (lw + Long.MIN_VALUE) < (rw + Long.MIN_VALUE) ? -1 : 1;
-
-                // Use binary search
-                int n = 0;
-                int y;
-                int x = (int) diff;
-
-                if (x == 0) {
-                    x = (int) (diff >>> 32);
-
-                    n = 32;
-                }
-
-                y = x << 16;
-
-                if (y == 0)
-                    n += 16;
-                else
-                    x = y;
-
-                y = x << 8;
-
-                if (y == 0)
-                    n += 8;
-
-                return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL));
-            }
-        }
-
-        // The epilogue to cover the last (minLength % 8) elements.
-        for (int i = minWords * Longs.BYTES; i < minLength; i++) {
-            int res = UnsignedBytes.compare(buf1[i], GridUnsafe.getByte(ptr2 + i));
-
-            if (res != 0)
-                return res;
-        }
-
-        return len1 - len2;
+        return HadoopUtils.compareBytes(val1.getBytes(), val1.getLength(), val2Ptr + len2, val2Len - len2);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3db0971d/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java
index a34388d..767e10a 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.processors.hadoop.impl;
 
+import com.google.common.primitives.Longs;
+import com.google.common.primitives.UnsignedBytes;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
@@ -32,6 +34,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus;
 import org.apache.ignite.internal.processors.hadoop.HadoopSplitWrapper;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
 import java.io.ByteArrayInputStream;
@@ -328,4 +331,67 @@ public class HadoopUtils {
             HadoopCommonUtils.restoreContextClassLoader(oldLdr);
         }
     }
+
+    /**
+     * Internal comparison routine.
+     *
+     * @param buf1 Bytes 1.
+     * @param len1 Length 1.
+     * @param ptr2 Pointer 2.
+     * @param len2 Length 2.
+     * @return Result.
+     */
+    @SuppressWarnings("SuspiciousNameCombination")
+    public static int compareBytes(byte[] buf1, int len1, long ptr2, int len2) {
+        int minLength = Math.min(len1, len2);
+
+        int minWords = minLength / Longs.BYTES;
+
+        for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) {
+            long lw = GridUnsafe.getLong(buf1, GridUnsafe.BYTE_ARR_OFF + i);
+            long rw = GridUnsafe.getLong(ptr2 + i);
+
+            long diff = lw ^ rw;
+
+            if (diff != 0) {
+                if (GridUnsafe.BIG_ENDIAN)
+                    return (lw + Long.MIN_VALUE) < (rw + Long.MIN_VALUE) ? -1 : 1;
+
+                // Use binary search
+                int n = 0;
+                int y;
+                int x = (int) diff;
+
+                if (x == 0) {
+                    x = (int) (diff >>> 32);
+
+                    n = 32;
+                }
+
+                y = x << 16;
+
+                if (y == 0)
+                    n += 16;
+                else
+                    x = y;
+
+                y = x << 8;
+
+                if (y == 0)
+                    n += 8;
+
+                return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL));
+            }
+        }
+
+        // The epilogue to cover the last (minLength % 8) elements.
+        for (int i = minWords * Longs.BYTES; i < minLength; i++) {
+            int res = UnsignedBytes.compare(buf1[i], GridUnsafe.getByte(ptr2 + i));
+
+            if (res != 0)
+                return res;
+        }
+
+        return len1 - len2;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/3db0971d/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
index 475e43d..b14dc47 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ByteWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.serializer.Deserializer;
@@ -40,6 +41,7 @@ import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.hadoop.io.BytesWritablePartiallyRawComparator;
 import org.apache.ignite.hadoop.io.PartiallyRawComparator;
 import org.apache.ignite.hadoop.io.TextPartiallyRawComparator;
 import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
@@ -49,7 +51,6 @@ import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
 import org.apache.ignite.internal.processors.hadoop.HadoopJob;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapperAwareTaskOutput;
 import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
 import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
 import org.apache.ignite.internal.processors.hadoop.HadoopSplitWrapper;
@@ -156,6 +157,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
 
         COMBINE_KEY_GROUPING_SUPPORTED = ok;
 
+        PARTIAL_COMPARATORS.put(ByteWritable.class.getName(), BytesWritablePartiallyRawComparator.class.getName());
         PARTIAL_COMPARATORS.put(Text.class.getName(), TextPartiallyRawComparator.class.getName());
     }
 
@@ -602,11 +604,16 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
         if (clsName == null) {
             Class keyCls = conf.getMapOutputKeyClass();
 
-            if (keyCls != null) {
+            while (keyCls != null) {
                 clsName = PARTIAL_COMPARATORS.get(keyCls.getName());
 
-                if (clsName != null)
+                if (clsName != null) {
                     conf.set(HadoopJobProperty.JOB_PARTIALLY_RAW_COMPARATOR.propertyName(), clsName);
+
+                    break;
+                }
+
+                keyCls = keyCls.getSuperclass();
             }
         }
     }