You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/02/17 09:59:31 UTC
[21/50] [abbrv] ignite git commit: IGNITE-4598: Hadoop: implemented
raw comparator for BytesWritable key type. This closes #1457.
IGNITE-4598: Hadoop: implemented raw comparator for BytesWritable key type. This closes #1457.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3db0971d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3db0971d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3db0971d
Branch: refs/heads/ignite-1.9
Commit: 3db0971d7d32798aeb3ce5dd8c0b3246a895fe91
Parents: 7f0af43
Author: devozerov <vo...@gridgain.com>
Authored: Tue Jan 24 16:45:59 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Jan 24 16:54:21 2017 +0300
----------------------------------------------------------------------
.../io/BytesWritablePartiallyRawComparator.java | 51 +++++++++++++++
.../hadoop/io/TextPartiallyRawComparator.java | 68 +-------------------
.../processors/hadoop/impl/HadoopUtils.java | 66 +++++++++++++++++++
.../hadoop/impl/v2/HadoopV2TaskContext.java | 13 +++-
4 files changed, 129 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3db0971d/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/BytesWritablePartiallyRawComparator.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/BytesWritablePartiallyRawComparator.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/BytesWritablePartiallyRawComparator.java
new file mode 100644
index 0000000..da9240b
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/BytesWritablePartiallyRawComparator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.io;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils;
+import org.apache.ignite.internal.processors.hadoop.io.OffheapRawMemory;
+import org.apache.ignite.internal.processors.hadoop.io.PartiallyOffheapRawComparatorEx;
+
+/**
+ * Partial raw comparator for {@link BytesWritable} data type.
+ * <p>
+ * Implementation is borrowed from {@code org.apache.hadoop.io.FastByteComparisons} and adopted to Ignite
+ * infrastructure.
+ */
+public class BytesWritablePartiallyRawComparator implements PartiallyRawComparator<BytesWritable>,
+ PartiallyOffheapRawComparatorEx<BytesWritable> {
+ /** Length bytes. */
+ private static final int LEN_BYTES = 4;
+
+ /** {@inheritDoc} */
+ @Override public int compare(BytesWritable val1, RawMemory val2Buf) {
+ if (val2Buf instanceof OffheapRawMemory) {
+ OffheapRawMemory val2Buf0 = (OffheapRawMemory)val2Buf;
+
+ return compare(val1, val2Buf0.pointer(), val2Buf0.length());
+ }
+ else
+ throw new UnsupportedOperationException("Text can be compared only with offheap memory.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compare(BytesWritable val1, long val2Ptr, int val2Len) {
+ return HadoopUtils.compareBytes(val1.getBytes(), val1.getLength(), val2Ptr + LEN_BYTES, val2Len - LEN_BYTES);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3db0971d/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/TextPartiallyRawComparator.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/TextPartiallyRawComparator.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/TextPartiallyRawComparator.java
index a2bc3d4..e82f5e4 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/TextPartiallyRawComparator.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/TextPartiallyRawComparator.java
@@ -17,10 +17,9 @@
package org.apache.ignite.hadoop.io;
-import com.google.common.primitives.Longs;
-import com.google.common.primitives.UnsignedBytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
+import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils;
import org.apache.ignite.internal.processors.hadoop.io.OffheapRawMemory;
import org.apache.ignite.internal.processors.hadoop.io.PartiallyOffheapRawComparatorEx;
import org.apache.ignite.internal.util.GridUnsafe;
@@ -47,69 +46,6 @@ public class TextPartiallyRawComparator implements PartiallyRawComparator<Text>,
@Override public int compare(Text val1, long val2Ptr, int val2Len) {
int len2 = WritableUtils.decodeVIntSize(GridUnsafe.getByte(val2Ptr));
- return compareBytes(val1.getBytes(), val1.getLength(), val2Ptr + len2, val2Len - len2);
- }
-
- /**
- * Internal comparison routine.
- *
- * @param buf1 Bytes 1.
- * @param len1 Length 1.
- * @param ptr2 Pointer 2.
- * @param len2 Length 2.
- * @return Result.
- */
- @SuppressWarnings("SuspiciousNameCombination")
- private static int compareBytes(byte[] buf1, int len1, long ptr2, int len2) {
- int minLength = Math.min(len1, len2);
-
- int minWords = minLength / Longs.BYTES;
-
- for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) {
- long lw = GridUnsafe.getLong(buf1, GridUnsafe.BYTE_ARR_OFF + i);
- long rw = GridUnsafe.getLong(ptr2 + i);
-
- long diff = lw ^ rw;
-
- if (diff != 0) {
- if (GridUnsafe.BIG_ENDIAN)
- return (lw + Long.MIN_VALUE) < (rw + Long.MIN_VALUE) ? -1 : 1;
-
- // Use binary search
- int n = 0;
- int y;
- int x = (int) diff;
-
- if (x == 0) {
- x = (int) (diff >>> 32);
-
- n = 32;
- }
-
- y = x << 16;
-
- if (y == 0)
- n += 16;
- else
- x = y;
-
- y = x << 8;
-
- if (y == 0)
- n += 8;
-
- return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL));
- }
- }
-
- // The epilogue to cover the last (minLength % 8) elements.
- for (int i = minWords * Longs.BYTES; i < minLength; i++) {
- int res = UnsignedBytes.compare(buf1[i], GridUnsafe.getByte(ptr2 + i));
-
- if (res != 0)
- return res;
- }
-
- return len1 - len2;
+ return HadoopUtils.compareBytes(val1.getBytes(), val1.getLength(), val2Ptr + len2, val2Len - len2);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3db0971d/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java
index a34388d..767e10a 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.processors.hadoop.impl;
+import com.google.common.primitives.Longs;
+import com.google.common.primitives.UnsignedBytes;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
@@ -32,6 +34,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus;
import org.apache.ignite.internal.processors.hadoop.HadoopSplitWrapper;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.typedef.internal.U;
import java.io.ByteArrayInputStream;
@@ -328,4 +331,67 @@ public class HadoopUtils {
HadoopCommonUtils.restoreContextClassLoader(oldLdr);
}
}
+
+ /**
+ * Internal comparison routine.
+ *
+ * @param buf1 Bytes 1.
+ * @param len1 Length 1.
+ * @param ptr2 Pointer 2.
+ * @param len2 Length 2.
+ * @return Result.
+ */
+ @SuppressWarnings("SuspiciousNameCombination")
+ public static int compareBytes(byte[] buf1, int len1, long ptr2, int len2) {
+ int minLength = Math.min(len1, len2);
+
+ int minWords = minLength / Longs.BYTES;
+
+ for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) {
+ long lw = GridUnsafe.getLong(buf1, GridUnsafe.BYTE_ARR_OFF + i);
+ long rw = GridUnsafe.getLong(ptr2 + i);
+
+ long diff = lw ^ rw;
+
+ if (diff != 0) {
+ if (GridUnsafe.BIG_ENDIAN)
+ return (lw + Long.MIN_VALUE) < (rw + Long.MIN_VALUE) ? -1 : 1;
+
+ // Use binary search
+ int n = 0;
+ int y;
+ int x = (int) diff;
+
+ if (x == 0) {
+ x = (int) (diff >>> 32);
+
+ n = 32;
+ }
+
+ y = x << 16;
+
+ if (y == 0)
+ n += 16;
+ else
+ x = y;
+
+ y = x << 8;
+
+ if (y == 0)
+ n += 8;
+
+ return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL));
+ }
+ }
+
+ // The epilogue to cover the last (minLength % 8) elements.
+ for (int i = minWords * Longs.BYTES; i < minLength; i++) {
+ int res = UnsignedBytes.compare(buf1[i], GridUnsafe.getByte(ptr2 + i));
+
+ if (res != 0)
+ return res;
+ }
+
+ return len1 - len2;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/3db0971d/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
index 475e43d..b14dc47 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.serializer.Deserializer;
@@ -40,6 +41,7 @@ import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.hadoop.io.BytesWritablePartiallyRawComparator;
import org.apache.ignite.hadoop.io.PartiallyRawComparator;
import org.apache.ignite.hadoop.io.TextPartiallyRawComparator;
import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
@@ -49,7 +51,6 @@ import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
import org.apache.ignite.internal.processors.hadoop.HadoopJob;
import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapperAwareTaskOutput;
import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
import org.apache.ignite.internal.processors.hadoop.HadoopSplitWrapper;
@@ -156,6 +157,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
COMBINE_KEY_GROUPING_SUPPORTED = ok;
+ PARTIAL_COMPARATORS.put(ByteWritable.class.getName(), BytesWritablePartiallyRawComparator.class.getName());
PARTIAL_COMPARATORS.put(Text.class.getName(), TextPartiallyRawComparator.class.getName());
}
@@ -602,11 +604,16 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
if (clsName == null) {
Class keyCls = conf.getMapOutputKeyClass();
- if (keyCls != null) {
+ while (keyCls != null) {
clsName = PARTIAL_COMPARATORS.get(keyCls.getName());
- if (clsName != null)
+ if (clsName != null) {
conf.set(HadoopJobProperty.JOB_PARTIALLY_RAW_COMPARATOR.propertyName(), clsName);
+
+ break;
+ }
+
+ keyCls = keyCls.getSuperclass();
}
}
}