You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/12/15 08:59:07 UTC
ignite git commit: IGNITE-4277: Hadoop: implemented "partially raw"
comparator. This closes #1345.
Repository: ignite
Updated Branches:
refs/heads/master 7094c0fd8 -> 7cc495bf6
IGNITE-4277: Hadoop: implemented "partially raw" comparator. This closes #1345.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7cc495bf
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7cc495bf
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7cc495bf
Branch: refs/heads/master
Commit: 7cc495bf6b1dedc3a87046f8cd373766eaa33463
Parents: 7094c0f
Author: devozerov <vo...@gridgain.com>
Authored: Thu Dec 15 11:58:28 2016 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Thu Dec 15 11:58:28 2016 +0300
----------------------------------------------------------------------
.../processors/hadoop/HadoopClassLoader.java | 1 +
.../processors/hadoop/HadoopJobProperty.java | 6 +-
.../processors/hadoop/HadoopTaskContext.java | 8 ++
.../io/PartiallyOffheapRawComparatorEx.java | 33 +++++
.../hadoop/io/PartiallyRawComparator.java | 33 +++++
.../org/apache/ignite/hadoop/io/RawMemory.java | 86 ++++++++++++
.../hadoop/io/TextPartiallyRawComparator.java | 115 ++++++++++++++++
.../apache/ignite/hadoop/io/package-info.java | 22 ++++
...DelegatingPartiallyOffheapRawComparator.java | 54 ++++++++
.../hadoop/impl/v2/HadoopV2TaskContext.java | 21 +++
.../processors/hadoop/io/OffheapRawMemory.java | 131 +++++++++++++++++++
.../shuffle/collections/HadoopSkipList.java | 14 +-
.../hadoop/impl/HadoopTeraSortTest.java | 7 +
.../collections/HadoopAbstractMapTest.java | 6 +
14 files changed, 535 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7cc495bf/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
index f6c2fa9..81c1405 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
@@ -372,6 +372,7 @@ public class HadoopClassLoader extends URLClassLoader implements ClassCache {
// We use "contains" instead of "equals" to handle subclasses properly.
if (clsName.contains("org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem") ||
clsName.contains("org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem") ||
+ clsName.contains("org.apache.ignite.hadoop.io.TextPartialRawComparator") ||
clsName.contains("org.apache.ignite.hadoop.mapreduce.IgniteHadoopClientProtocolProvider"))
return true;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7cc495bf/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
index 9e1dede..4122eef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
@@ -64,6 +64,11 @@ public enum HadoopJobProperty {
JOB_SHARED_CLASSLOADER("ignite.job.shared.classloader"),
/**
+ * Fully qualified name of partially-raw comparator which should be used on sorting phase.
+ */
+ JOB_PARTIAL_RAW_COMPARATOR("ignite.job.partial.raw.comparator"),
+
+ /**
* Size in bytes of single memory page which will be allocated for data structures in shuffle.
* <p>
* By default is {@code 32 * 1024}.
@@ -112,7 +117,6 @@ public enum HadoopJobProperty {
*/
SHUFFLE_JOB_THROTTLE("ignite.shuffle.job.throttle");
-
/** Property name. */
private final String propName;
http://git-wip-us.apache.org/repos/asf/ignite/blob/7cc495bf/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
index ecb9f26..dddd017 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
@@ -22,6 +22,7 @@ import java.util.concurrent.Callable;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.processors.hadoop.io.PartiallyOffheapRawComparatorEx;
/**
* Task context.
@@ -157,6 +158,13 @@ public abstract class HadoopTaskContext {
public abstract Comparator<Object> sortComparator();
/**
+ * Get semi-raw sorting comparator.
+ *
+ * @return Semi-raw sorting comparator.
+ */
+ public abstract PartiallyOffheapRawComparatorEx<Object> partialRawSortComparator();
+
+ /**
* Gets comparator for grouping on combine or reduce operation.
*
* @return Comparator.
http://git-wip-us.apache.org/repos/asf/ignite/blob/7cc495bf/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/io/PartiallyOffheapRawComparatorEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/io/PartiallyOffheapRawComparatorEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/io/PartiallyOffheapRawComparatorEx.java
new file mode 100644
index 0000000..157609e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/io/PartiallyOffheapRawComparatorEx.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.io;
+
+/**
+ * Special version of raw comparator allowing direct access to the underlying memory.
+ */
+public interface PartiallyOffheapRawComparatorEx<T> {
+ /**
+ * Perform compare.
+ *
+ * @param val1 First value.
+ * @param val2Ptr Pointer to the second value data.
+ * @param val2Len Length of the second value data.
+ * @return Result.
+ */
+ int compare(T val1, long val2Ptr, int val2Len);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7cc495bf/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/PartiallyRawComparator.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/PartiallyRawComparator.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/PartiallyRawComparator.java
new file mode 100644
index 0000000..b9a4505
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/PartiallyRawComparator.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.io;
+
+/**
+ * Partially raw comparator. Compares one deserialized value with serialized value.
+ */
+public interface PartiallyRawComparator<T> {
+ /**
+ * Do compare.
+ *
+ * @param val1 First value (deserialized).
+ * @param val2Buf Second value (serialized).
+ * @return A negative integer, zero, or a positive integer as this object is less than, equal to, or greater
+ * than the specified object.
+ */
+ int compare(T val1, RawMemory val2Buf);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7cc495bf/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/RawMemory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/RawMemory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/RawMemory.java
new file mode 100644
index 0000000..8dcaf83
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/RawMemory.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.io;
+
+/**
+ * Memory abstraction for raw comparison.
+ */
+public interface RawMemory {
+ /**
+ * Get byte value at the given index.
+ *
+ * @param idx Index.
+ * @return Value.
+ */
+ byte get(int idx);
+
+ /**
+ * Get short value at the given index.
+ *
+ * @param idx Index.
+ * @return Value.
+ */
+ short getShort(int idx);
+
+ /**
+ * Get char value at the given index.
+ *
+ * @param idx Index.
+ * @return Value.
+ */
+ char getChar(int idx);
+
+ /**
+ * Get int value at the given index.
+ *
+ * @param idx Index.
+ * @return Value.
+ */
+ int getInt(int idx);
+
+ /**
+ * Get long value at the given index.
+ *
+ * @param idx Index.
+ * @return Value.
+ */
+ long getLong(int idx);
+
+ /**
+ * Get float value at the given index.
+ *
+ * @param idx Index.
+ * @return Value.
+ */
+ float getFloat(int idx);
+
+ /**
+ * Get double value at the given index.
+ *
+ * @param idx Index.
+ * @return Value.
+ */
+ double getDouble(int idx);
+
+ /**
+ * Get length.
+ *
+ * @return Length.
+ */
+ int length();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7cc495bf/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/TextPartiallyRawComparator.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/TextPartiallyRawComparator.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/TextPartiallyRawComparator.java
new file mode 100644
index 0000000..a2bc3d4
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/TextPartiallyRawComparator.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.io;
+
+import com.google.common.primitives.Longs;
+import com.google.common.primitives.UnsignedBytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.ignite.internal.processors.hadoop.io.OffheapRawMemory;
+import org.apache.ignite.internal.processors.hadoop.io.PartiallyOffheapRawComparatorEx;
+import org.apache.ignite.internal.util.GridUnsafe;
+
+/**
+ * Partial raw comparator for {@link Text} data type.
+ * <p>
+ * Implementation is borrowed from {@code org.apache.hadoop.io.FastByteComparisons} and adopted to Ignite
+ * infrastructure.
+ */
+public class TextPartiallyRawComparator implements PartiallyRawComparator<Text>, PartiallyOffheapRawComparatorEx<Text> {
+ /** {@inheritDoc} */
+ @Override public int compare(Text val1, RawMemory val2Buf) {
+ if (val2Buf instanceof OffheapRawMemory) {
+ OffheapRawMemory val2Buf0 = (OffheapRawMemory)val2Buf;
+
+ return compare(val1, val2Buf0.pointer(), val2Buf0.length());
+ }
+ else
+ throw new UnsupportedOperationException("Text can be compared only with offheap memory.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compare(Text val1, long val2Ptr, int val2Len) {
+ int len2 = WritableUtils.decodeVIntSize(GridUnsafe.getByte(val2Ptr));
+
+ return compareBytes(val1.getBytes(), val1.getLength(), val2Ptr + len2, val2Len - len2);
+ }
+
+ /**
+ * Internal comparison routine.
+ *
+ * @param buf1 Bytes 1.
+ * @param len1 Length 1.
+ * @param ptr2 Pointer 2.
+ * @param len2 Length 2.
+ * @return Result.
+ */
+ @SuppressWarnings("SuspiciousNameCombination")
+ private static int compareBytes(byte[] buf1, int len1, long ptr2, int len2) {
+ int minLength = Math.min(len1, len2);
+
+ int minWords = minLength / Longs.BYTES;
+
+ for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) {
+ long lw = GridUnsafe.getLong(buf1, GridUnsafe.BYTE_ARR_OFF + i);
+ long rw = GridUnsafe.getLong(ptr2 + i);
+
+ long diff = lw ^ rw;
+
+ if (diff != 0) {
+ if (GridUnsafe.BIG_ENDIAN)
+ return (lw + Long.MIN_VALUE) < (rw + Long.MIN_VALUE) ? -1 : 1;
+
+ // Use binary search
+ int n = 0;
+ int y;
+ int x = (int) diff;
+
+ if (x == 0) {
+ x = (int) (diff >>> 32);
+
+ n = 32;
+ }
+
+ y = x << 16;
+
+ if (y == 0)
+ n += 16;
+ else
+ x = y;
+
+ y = x << 8;
+
+ if (y == 0)
+ n += 8;
+
+ return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL));
+ }
+ }
+
+ // The epilogue to cover the last (minLength % 8) elements.
+ for (int i = minWords * Longs.BYTES; i < minLength; i++) {
+ int res = UnsignedBytes.compare(buf1[i], GridUnsafe.getByte(ptr2 + i));
+
+ if (res != 0)
+ return res;
+ }
+
+ return len1 - len2;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7cc495bf/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/package-info.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/package-info.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/package-info.java
new file mode 100644
index 0000000..0d1f7b9
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Contains <b>Hadoop Accelerator</b> API for input-output operations.
+ */
+package org.apache.ignite.hadoop.io;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/7cc495bf/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2DelegatingPartiallyOffheapRawComparator.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2DelegatingPartiallyOffheapRawComparator.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2DelegatingPartiallyOffheapRawComparator.java
new file mode 100644
index 0000000..e6d369e
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2DelegatingPartiallyOffheapRawComparator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.v2;
+
+import org.apache.ignite.hadoop.io.PartiallyRawComparator;
+import org.apache.ignite.internal.processors.hadoop.io.OffheapRawMemory;
+import org.apache.ignite.internal.processors.hadoop.io.PartiallyOffheapRawComparatorEx;
+
+/**
+ * Delegating partial raw comparator.
+ */
+public class HadoopV2DelegatingPartiallyOffheapRawComparator<T> implements PartiallyOffheapRawComparatorEx<T> {
+ /** Target comparator. */
+ private final PartiallyRawComparator<T> target;
+
+ /** Memory. */
+ private OffheapRawMemory mem;
+
+ /**
+ * Constructor.
+ *
+ * @param target Target.
+ */
+ public HadoopV2DelegatingPartiallyOffheapRawComparator(PartiallyRawComparator<T> target) {
+ assert target != null;
+
+ this.target = target;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compare(T val1, long val2Ptr, int val2Len) {
+ if (mem == null)
+ mem = new OffheapRawMemory(val2Ptr, val2Len);
+ else
+ mem.update(val2Ptr, val2Len);
+
+ return target.compare(val1, mem);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7cc495bf/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
index d444f2b..42bbec5 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
@@ -38,13 +38,16 @@ import org.apache.hadoop.mapreduce.JobSubmissionFiles;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.hadoop.io.PartiallyRawComparator;
import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
import org.apache.ignite.internal.processors.hadoop.HadoopExternalSplit;
import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
import org.apache.ignite.internal.processors.hadoop.HadoopJob;
import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty;
import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
import org.apache.ignite.internal.processors.hadoop.HadoopSplitWrapper;
@@ -62,6 +65,7 @@ import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1MapTask;
import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1Partitioner;
import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1ReduceTask;
import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1SetupTask;
+import org.apache.ignite.internal.processors.hadoop.io.PartiallyOffheapRawComparatorEx;
import org.apache.ignite.internal.processors.igfs.IgfsUtils;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
@@ -421,11 +425,28 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
}
/** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
@Override public Comparator<Object> sortComparator() {
return (Comparator<Object>)jobCtx.getSortComparator();
}
/** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public PartiallyOffheapRawComparatorEx<Object> partialRawSortComparator() {
+ Class cls = jobCtx.getJobConf().getClass(HadoopJobProperty.JOB_PARTIAL_RAW_COMPARATOR.propertyName(), null);
+
+ if (cls == null)
+ return null;
+
+ Object res = ReflectionUtils.newInstance(cls, jobConf());
+
+ if (res instanceof PartiallyOffheapRawComparatorEx)
+ return (PartiallyOffheapRawComparatorEx)res;
+ else
+ return new HadoopV2DelegatingPartiallyOffheapRawComparator<>((PartiallyRawComparator)res);
+ }
+
+ /** {@inheritDoc} */
@Override public Comparator<Object> groupComparator() {
Comparator<?> res;
http://git-wip-us.apache.org/repos/asf/ignite/blob/7cc495bf/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/io/OffheapRawMemory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/io/OffheapRawMemory.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/io/OffheapRawMemory.java
new file mode 100644
index 0000000..564f92c
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/io/OffheapRawMemory.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.io;
+
+import org.apache.ignite.hadoop.io.RawMemory;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Offheap-based memory.
+ */
+public class OffheapRawMemory implements RawMemory {
+ /** Pointer. */
+ private long ptr;
+
+ /** Length. */
+ private int len;
+
+ /**
+ * Constructor.
+ *
+ * @param ptr Pointer.
+ * @param len Length.
+ */
+ public OffheapRawMemory(long ptr, int len) {
+ update(ptr, len);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte get(int idx) {
+ ensure(idx, 1);
+
+ return GridUnsafe.getByte(ptr + idx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short getShort(int idx) {
+ ensure(idx, 2);
+
+ return GridUnsafe.getShort(ptr + idx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public char getChar(int idx) {
+ ensure(idx, 2);
+
+ return GridUnsafe.getChar(ptr + idx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getInt(int idx) {
+ ensure(idx, 4);
+
+ return GridUnsafe.getInt(ptr + idx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getLong(int idx) {
+ ensure(idx, 8);
+
+ return GridUnsafe.getLong(ptr + idx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public float getFloat(int idx) {
+ ensure(idx, 4);
+
+ return GridUnsafe.getFloat(ptr + idx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public double getDouble(int idx) {
+ ensure(idx, 8);
+
+ return GridUnsafe.getDouble(ptr + idx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int length() {
+ return len;
+ }
+
+ /**
+ * @return Raw pointer.
+ */
+ public long pointer() {
+ return ptr;
+ }
+
+ /**
+ * Update pointer and length.
+ *
+ * @param ptr Pointer.
+ * @param len Length.
+ */
+ public void update(long ptr, int len) {
+ this.ptr = ptr;
+ this.len = len;
+ }
+
+ /**
+ * Ensure that the given number of bytes are available for read. Throw an exception otherwise.
+ *
+ * @param idx Index.
+ * @param cnt Count.
+ */
+ private void ensure(int idx, int cnt) {
+ if (idx < 0 || idx + cnt - 1 >= len)
+ throw new IndexOutOfBoundsException("Illegal index [len=" + len + ", idx=" + idx + ']');
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(OffheapRawMemory.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7cc495bf/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java
index 7db88bc..f300a18 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
+import org.apache.ignite.internal.processors.hadoop.io.PartiallyOffheapRawComparatorEx;
import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.GridRandom;
import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
@@ -280,6 +281,9 @@ public class HadoopSkipList extends HadoopMultimapBase {
private final Comparator<Object> cmp;
/** */
+ private final PartiallyOffheapRawComparatorEx<Object> partialRawCmp;
+
+ /** */
private final Random rnd = new GridRandom();
/** */
@@ -298,6 +302,7 @@ public class HadoopSkipList extends HadoopMultimapBase {
keyReader = new Reader(keySer);
cmp = ctx.sortComparator();
+ partialRawCmp = ctx.partialRawSortComparator();
}
/** {@inheritDoc} */
@@ -475,7 +480,14 @@ public class HadoopSkipList extends HadoopMultimapBase {
private int cmp(Object key, long meta) {
assert meta != 0;
- return cmp.compare(key, keyReader.readKey(meta));
+ if (partialRawCmp != null) {
+ long keyPtr = key(meta);
+ int keySize = keySize(keyPtr);
+
+ return partialRawCmp.compare(key, keyPtr + 4, keySize);
+ }
+ else
+ return cmp.compare(key, keyReader.readKey(meta));
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/7cc495bf/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
index 0cc9564..a016506 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
@@ -41,8 +41,10 @@ import org.apache.hadoop.util.ToolRunner;
import org.apache.ignite.IgniteException;
import org.apache.ignite.configuration.HadoopConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.hadoop.io.TextPartiallyRawComparator;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty;
import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo;
@@ -161,6 +163,11 @@ public class HadoopTeraSortTest extends HadoopAbstractSelfTest {
jobConf.set("mapred.min.split.size", String.valueOf(splitSize));
jobConf.set("mapred.max.split.size", String.valueOf(splitSize));
+ jobConf.setBoolean(HadoopJobProperty.SHUFFLE_MAPPER_STRIPED_OUTPUT.propertyName(), true);
+
+ jobConf.set(HadoopJobProperty.JOB_PARTIAL_RAW_COMPARATOR.propertyName(),
+ TextPartiallyRawComparator.class.getName());
+
Job job = setupConfig(jobConf);
HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1);
http://git-wip-us.apache.org/repos/asf/ignite/blob/7cc495bf/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java
index 9d1fd4f..1f8978d 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopWritableSerialization;
+import org.apache.ignite.internal.processors.hadoop.io.PartiallyOffheapRawComparatorEx;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
@@ -84,6 +85,11 @@ public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
+ @Override public PartiallyOffheapRawComparatorEx<Object> partialRawSortComparator() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public Comparator<Object> groupComparator() {
return ComparableComparator.getInstance();