You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/12/13 11:14:58 UTC
[3/4] incubator-carbondata git commit: Added unsafe on-heap/off-heap
sort to improve loading performance
Added unsafe on-heap/off-heap sort to improve loading performance
fixed testcase
fixed row duplicated issue.
rebased and changed the default value
Added file header for code porting.
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/f1f9348d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/f1f9348d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/f1f9348d
Branch: refs/heads/master
Commit: f1f9348d0d7150c95500f8f10d3fd3adde47ecb2
Parents: 8940514
Author: ravipesala <ra...@gmail.com>
Authored: Wed Dec 7 00:29:04 2016 +0530
Committer: jackylk <ja...@huawei.com>
Committed: Tue Dec 13 19:13:25 2016 +0800
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 32 +
.../core/memory/HeapMemoryAllocator.java | 84 ++
.../carbondata/core/memory/MemoryAllocator.java | 36 +
.../carbondata/core/memory/MemoryBlock.java | 57 ++
.../carbondata/core/memory/MemoryLocation.java | 55 ++
.../core/memory/UnsafeMemoryAllocator.java | 40 +
.../carbondata/core/unsafe/CarbonUnsafe.java | 48 +
dev/javastyle-config.xml | 6 +-
dev/javastyle-suppressions.xml | 35 +
.../newflow/iterator/InputIterator.java | 40 +
.../impl/UnsafeParallelReadMergeSorterImpl.java | 219 +++++
.../newflow/sort/unsafe/IntPointerBuffer.java | 95 ++
.../sort/unsafe/UnsafeCarbonRowPage.java | 356 +++++++
.../sort/unsafe/UnsafeMemoryManager.java | 99 ++
.../newflow/sort/unsafe/UnsafeSortDataRows.java | 356 +++++++
.../unsafe/comparator/UnsafeRowComparator.java | 133 +++
.../UnsafeRowComparatorForNormalDIms.java | 61 ++
.../sort/unsafe/holder/SortTempChunkHolder.java | 35 +
.../sort/unsafe/holder/UnsafeCarbonRow.java | 23 +
.../unsafe/holder/UnsafeCarbonRowForMerge.java | 25 +
.../holder/UnsafeFinalMergePageHolder.java | 90 ++
.../unsafe/holder/UnsafeInmemoryHolder.java | 80 ++
.../holder/UnsafeInmemoryMergeHolder.java | 90 ++
.../holder/UnsafeSortTempFileChunkHolder.java | 455 +++++++++
.../UnsafeInMemoryIntermediateDataMerger.java | 217 +++++
.../merger/UnsafeIntermediateFileMerger.java | 364 +++++++
.../unsafe/merger/UnsafeIntermediateMerger.java | 180 ++++
.../UnsafeSingleThreadFinalSortFilesMerger.java | 313 ++++++
.../newflow/sort/unsafe/sort/TimSort.java | 943 +++++++++++++++++++
.../unsafe/sort/UnsafeIntSortDataFormat.java | 74 ++
.../newflow/steps/SortProcessorStepImpl.java | 10 +-
31 files changed, 4647 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f1f9348d/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 8257756..033b48d 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -953,6 +953,38 @@ public final class CarbonCommonConstants {
*/
public static final int CARBON_EXECUTOR_STARTUP_THREAD_SLEEP_TIME = 250;
+ /**
+ * to enable offheap sort
+ */
+ public static final String ENABLE_UNSAFE_SORT = "enable.unsafe.sort";
+
+ /**
+ * to enable offheap sort
+ */
+ public static final String ENABLE_UNSAFE_SORT_DEFAULT = "false";
+
+ /**
+ * to enable offheap sort
+ */
+ public static final String ENABLE_OFFHEAP_SORT = "enable.offheap.sort";
+
+ /**
+ * to enable offheap sort
+ */
+ public static final String ENABLE_OFFHEAP_SORT_DEFAULT = "true";
+
+ public static final String ENABLE_INMEMORY_MERGE_SORT = "enable.inmemory.merge.sort";
+
+ public static final String ENABLE_INMEMORY_MERGE_SORT_DEFAULT = "true";
+
+ public static final String OFFHEAP_SORT_CHUNK_SIZE_IN_MB = "offheap.sort.chunk.size.inmb";
+
+ public static final String OFFHEAP_SORT_CHUNK_SIZE_IN_MB_DEFAULT = "64";
+
+ public static final String IN_MEMORY_FOR_SORT_DATA_IN_MB = "sort.inmemory.size.inmb";
+
+ public static final String IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT = "1024";
+
private CarbonCommonConstants() {
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f1f9348d/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java b/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java
new file mode 100644
index 0000000..67e766c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.memory;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.lang.ref.WeakReference;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.carbondata.core.unsafe.CarbonUnsafe;
+
+/**
+ * Code ported from Apache Spark {org.apache.spark.unsafe.memory} package
+ * A simple {@link MemoryAllocator} that can allocate up to 16GB using a JVM long primitive array.
+ */
+public class HeapMemoryAllocator implements MemoryAllocator {
+
+ @GuardedBy("this") private final Map<Long, LinkedList<WeakReference<MemoryBlock>>>
+ bufferPoolsBySize = new HashMap<>();
+
+ private static final int POOLING_THRESHOLD_BYTES = 1024 * 1024;
+
+ /**
+ * Returns true if allocations of the given size should go through the pooling mechanism and
+ * false otherwise.
+ */
+ private boolean shouldPool(long size) {
+ // Very small allocations are less likely to benefit from pooling.
+ return size >= POOLING_THRESHOLD_BYTES;
+ }
+
+ @Override public MemoryBlock allocate(long size) throws OutOfMemoryError {
+ if (shouldPool(size)) {
+ synchronized (this) {
+ final LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size);
+ if (pool != null) {
+ while (!pool.isEmpty()) {
+ final WeakReference<MemoryBlock> blockReference = pool.pop();
+ final MemoryBlock memory = blockReference.get();
+ if (memory != null) {
+ assert (memory.size() == size);
+ return memory;
+ }
+ }
+ bufferPoolsBySize.remove(size);
+ }
+ }
+ }
+ long[] array = new long[(int) ((size + 7) / 8)];
+ return new MemoryBlock(array, CarbonUnsafe.LONG_ARRAY_OFFSET, size);
+ }
+
+ @Override public void free(MemoryBlock memory) {
+ final long size = memory.size();
+ if (shouldPool(size)) {
+ synchronized (this) {
+ LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size);
+ if (pool == null) {
+ pool = new LinkedList<>();
+ bufferPoolsBySize.put(size, pool);
+ }
+ pool.add(new WeakReference<>(memory));
+ }
+ } else {
+ // Do nothing
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f1f9348d/core/src/main/java/org/apache/carbondata/core/memory/MemoryAllocator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/MemoryAllocator.java b/core/src/main/java/org/apache/carbondata/core/memory/MemoryAllocator.java
new file mode 100644
index 0000000..ccb3986
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/memory/MemoryAllocator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.memory;
+
+/**
+ * Code ported from Apache Spark {org.apache.spark.unsafe.memory} package
+ */
+public interface MemoryAllocator {
+
+ /**
+ * Allocates a contiguous block of memory. Note that the allocated memory is not guaranteed
+ * to be zeroed out (call `zero()` on the result if this is necessary).
+ */
+ MemoryBlock allocate(long size) throws OutOfMemoryError;
+
+ void free(MemoryBlock memory);
+
+ MemoryAllocator UNSAFE = new UnsafeMemoryAllocator();
+
+ MemoryAllocator HEAP = new HeapMemoryAllocator();
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f1f9348d/core/src/main/java/org/apache/carbondata/core/memory/MemoryBlock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/MemoryBlock.java b/core/src/main/java/org/apache/carbondata/core/memory/MemoryBlock.java
new file mode 100644
index 0000000..ab9b3d4
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/memory/MemoryBlock.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.memory;
+
+import javax.annotation.Nullable;
+
+import org.apache.spark.unsafe.Platform;
+
+/**
+ * Code ported from Apache Spark {org.apache.spark.unsafe.memory} package
+ * A consecutive block of memory, starting at a {@link MemoryLocation} with a fixed size.
+ */
+public class MemoryBlock extends MemoryLocation {
+
+ private final long length;
+
+ /**
+ * Optional page number; used when this MemoryBlock represents a page allocated by a
+ * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager,
+ * which lives in a different package.
+ */
+ public int pageNumber = -1;
+
+ public MemoryBlock(@Nullable Object obj, long offset, long length) {
+ super(obj, offset);
+ this.length = length;
+ }
+
+ /**
+ * Returns the size of the memory block.
+ */
+ public long size() {
+ return length;
+ }
+
+ /**
+ * Creates a memory block pointing to the memory used by the long array.
+ */
+ public static MemoryBlock fromLongArray(final long[] array) {
+ return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f1f9348d/core/src/main/java/org/apache/carbondata/core/memory/MemoryLocation.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/MemoryLocation.java b/core/src/main/java/org/apache/carbondata/core/memory/MemoryLocation.java
new file mode 100644
index 0000000..0419e04
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/memory/MemoryLocation.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.memory;
+
+import javax.annotation.Nullable;
+
+/**
+ * Code ported from Apache Spark {org.apache.spark.unsafe.memory} package
+ * A memory location. Tracked either by a memory address (with off-heap allocation),
+ * or by an offset from a JVM object (in-heap allocation).
+ */
+public class MemoryLocation {
+
+ @Nullable
+ Object obj;
+
+ long offset;
+
+ public MemoryLocation(@Nullable Object obj, long offset) {
+ this.obj = obj;
+ this.offset = offset;
+ }
+
+ public MemoryLocation() {
+ this(null, 0);
+ }
+
+ public void setObjAndOffset(Object newObj, long newOffset) {
+ this.obj = newObj;
+ this.offset = newOffset;
+ }
+
+ public final Object getBaseObject() {
+ return obj;
+ }
+
+ public final long getBaseOffset() {
+ return offset;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f1f9348d/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryAllocator.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryAllocator.java
new file mode 100644
index 0000000..ae4cc0a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryAllocator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.memory;
+
+import org.apache.carbondata.core.unsafe.CarbonUnsafe;
+
+/**
+ * Code ported from Apache Spark {org.apache.spark.unsafe.memory} package
+ * A simple {@link MemoryAllocator} that uses {@code Unsafe} to allocate off-heap memory.
+ */
+public class UnsafeMemoryAllocator implements MemoryAllocator {
+
+ @Override
+ public MemoryBlock allocate(long size) throws OutOfMemoryError {
+ long address = CarbonUnsafe.unsafe.allocateMemory(size);
+ return new MemoryBlock(null, address, size);
+ }
+
+ @Override
+ public void free(MemoryBlock memory) {
+ assert (memory.obj == null) :
+ "baseObject not null; are you trying to use the off-heap allocator to free on-heap memory?";
+ CarbonUnsafe.unsafe.freeMemory(memory.offset);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f1f9348d/core/src/main/java/org/apache/carbondata/core/unsafe/CarbonUnsafe.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/unsafe/CarbonUnsafe.java b/core/src/main/java/org/apache/carbondata/core/unsafe/CarbonUnsafe.java
new file mode 100644
index 0000000..89ac14a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/unsafe/CarbonUnsafe.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.unsafe;
+
+import java.lang.reflect.Field;
+
+import sun.misc.Unsafe;
+
+
+public final class CarbonUnsafe {
+
+ public static final int BYTE_ARRAY_OFFSET;
+
+ public static final int LONG_ARRAY_OFFSET;
+
+ public static Unsafe unsafe;
+
+ static {
+ try {
+ Field cause = Unsafe.class.getDeclaredField("theUnsafe");
+ cause.setAccessible(true);
+ unsafe = (Unsafe) cause.get((Object) null);
+ } catch (Throwable var2) {
+ unsafe = null;
+ }
+ if (unsafe != null) {
+ BYTE_ARRAY_OFFSET = unsafe.arrayBaseOffset(byte[].class);
+ LONG_ARRAY_OFFSET = unsafe.arrayBaseOffset(long[].class);
+ } else {
+ BYTE_ARRAY_OFFSET = 0;
+ LONG_ARRAY_OFFSET = 0;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f1f9348d/dev/javastyle-config.xml
----------------------------------------------------------------------
diff --git a/dev/javastyle-config.xml b/dev/javastyle-config.xml
index e77c9fd..6c9b50c 100644
--- a/dev/javastyle-config.xml
+++ b/dev/javastyle-config.xml
@@ -48,9 +48,9 @@
<property name="fileExtensions" value="java, properties, xml"/>
- <!--module name="SuppressionFilter">
- <property name="file" value="dev/checkstyle-suppressions.xml"/>
- </module-->
+ <module name="SuppressionFilter">
+ <property name="file" value="dev/javastyle-suppressions.xml"/>
+ </module>
<!-- Checks for whitespace -->
<!-- See http://checkstyle.sf.net/config_whitespace.html -->
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f1f9348d/dev/javastyle-suppressions.xml
----------------------------------------------------------------------
diff --git a/dev/javastyle-suppressions.xml b/dev/javastyle-suppressions.xml
new file mode 100644
index 0000000..9780dcb
--- /dev/null
+++ b/dev/javastyle-suppressions.xml
@@ -0,0 +1,35 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<!DOCTYPE suppressions PUBLIC
+"-//Puppy Crawl//DTD Suppressions 1.1//EN"
+"http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">
+
+<!--
+
+ This file contains suppression rules for Checkstyle checks.
+ Ideally only files that cannot be modified (e.g. third-party code)
+ should be added here. All other violations should be fixed.
+
+-->
+
+<suppressions>
+ <suppress checks=".*"
+ files="org/apache/carbondata/processing/newflow/sort/unsafe/sort/TimSort.java"/>
+ <suppress checks=".*"
+ files="org/apache/carbondata/core/memory/HeapMemoryAllocator"/>
+</suppressions>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f1f9348d/processing/src/main/java/org/apache/carbondata/processing/newflow/iterator/InputIterator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/iterator/InputIterator.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/iterator/InputIterator.java
new file mode 100644
index 0000000..a87aaa2
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/iterator/InputIterator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.newflow.iterator;
+
+import java.util.Iterator;
+
+/**
+ * It iterates the data of record readers
+ */
+public abstract class InputIterator<E> implements Iterator<E> {
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Not permitted");
+ }
+
+ /**
+ * Initialize the iterator
+ */
+ public abstract void initialize();
+
+ /**
+ * Close the resources
+ */
+ public abstract void close();
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f1f9348d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
new file mode 100644
index 0000000..3a29647
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.newflow.sort.impl;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.processing.newflow.DataField;
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+import org.apache.carbondata.processing.newflow.sort.Sorter;
+import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeSortDataRows;
+import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeIntermediateMerger;
+import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
+import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
+import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
+import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * It parallely reads data from array of iterates and do merge sort.
+ * First it sorts the data and write to temp files. These temp files will be merge sorted to get
+ * final merge sort result.
+ */
+public class UnsafeParallelReadMergeSorterImpl implements Sorter {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(UnsafeParallelReadMergeSorterImpl.class.getName());
+
+ private SortParameters sortParameters;
+
+ private UnsafeIntermediateMerger unsafeIntermediateFileMerger;
+
+ private ExecutorService executorService;
+
+ private UnsafeSingleThreadFinalSortFilesMerger finalMerger;
+
+ private DataField[] inputDataFields;
+
+ public UnsafeParallelReadMergeSorterImpl(DataField[] inputDataFields) {
+ this.inputDataFields = inputDataFields;
+ }
+
+ @Override public void initialize(SortParameters sortParameters) {
+ this.sortParameters = sortParameters;
+ unsafeIntermediateFileMerger = new UnsafeIntermediateMerger(sortParameters);
+ String storeLocation = CarbonDataProcessorUtil
+ .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(),
+ String.valueOf(sortParameters.getTaskNo()), sortParameters.getPartitionID(),
+ sortParameters.getSegmentId() + "", false);
+ // Set the data file location
+ String dataFolderLocation =
+ storeLocation + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION;
+ finalMerger = new UnsafeSingleThreadFinalSortFilesMerger(sortParameters);
+ }
+
+ @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
+ throws CarbonDataLoadingException {
+ UnsafeSortDataRows sortDataRow =
+ new UnsafeSortDataRows(sortParameters, unsafeIntermediateFileMerger);
+ final int batchSize = CarbonProperties.getInstance().getBatchSize();
+ try {
+ sortDataRow.initialize();
+ } catch (CarbonSortKeyAndGroupByException e) {
+ throw new CarbonDataLoadingException(e);
+ }
+ this.executorService = Executors.newFixedThreadPool(iterators.length);
+
+ try {
+ for (int i = 0; i < iterators.length; i++) {
+ executorService
+ .submit(new SortIteratorThread(iterators[i], sortDataRow, sortParameters, batchSize));
+ }
+ executorService.shutdown();
+ executorService.awaitTermination(2, TimeUnit.DAYS);
+ processRowToNextStep(sortDataRow, sortParameters);
+ } catch (Exception e) {
+ throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
+ }
+ try {
+ unsafeIntermediateFileMerger.finish();
+ List<UnsafeCarbonRowPage> rowPages = unsafeIntermediateFileMerger.getRowPages();
+ finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]),
+ unsafeIntermediateFileMerger.getMergedPages());
+ } catch (CarbonDataWriterException e) {
+ throw new CarbonDataLoadingException(e);
+ } catch (CarbonSortKeyAndGroupByException e) {
+ throw new CarbonDataLoadingException(e);
+ }
+
+ // Creates the iterator to read from merge sorter.
+ Iterator<CarbonRowBatch> batchIterator = new CarbonIterator<CarbonRowBatch>() {
+
+ @Override public boolean hasNext() {
+ return finalMerger.hasNext();
+ }
+
+ @Override public CarbonRowBatch next() {
+ int counter = 0;
+ CarbonRowBatch rowBatch = new CarbonRowBatch();
+ while (finalMerger.hasNext() && counter < batchSize) {
+ rowBatch.addRow(new CarbonRow(finalMerger.next()));
+ counter++;
+ }
+ return rowBatch;
+ }
+ };
+ return new Iterator[] { batchIterator };
+ }
+
+ @Override public void close() {
+ unsafeIntermediateFileMerger.close();
+ finalMerger.clear();
+ }
+
+ /**
+ * Below method will be used to process data to next step
+ */
+ private boolean processRowToNextStep(UnsafeSortDataRows sortDataRows, SortParameters parameters)
+ throws CarbonDataLoadingException {
+ if (null == sortDataRows) {
+ LOGGER.info("Record Processed For table: " + parameters.getTableName());
+ LOGGER.info("Number of Records was Zero");
+ String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
+ LOGGER.info(logMessage);
+ return false;
+ }
+
+ try {
+ // start sorting
+ sortDataRows.startSorting();
+
+ // check any more rows are present
+ LOGGER.info("Record Processed For table: " + parameters.getTableName());
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+ .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+ .recordDictionaryValuesTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
+ return false;
+ } catch (CarbonSortKeyAndGroupByException e) {
+ throw new CarbonDataLoadingException(e);
+ }
+ }
+
+ /**
+ * This thread iterates the iterator and adds the rows
+ */
+ private static class SortIteratorThread implements Callable<Void> {
+
+ private Iterator<CarbonRowBatch> iterator;
+
+ private UnsafeSortDataRows sortDataRows;
+
+ private SortParameters parameters;
+
+ private Object[][] buffer;
+
+ public SortIteratorThread(Iterator<CarbonRowBatch> iterator, UnsafeSortDataRows sortDataRows,
+ SortParameters parameters, int batchSize) {
+ this.iterator = iterator;
+ this.sortDataRows = sortDataRows;
+ this.parameters = parameters;
+ this.buffer = new Object[batchSize][];
+ }
+
+ @Override public Void call() throws CarbonDataLoadingException {
+ try {
+ while (iterator.hasNext()) {
+ CarbonRowBatch batch = iterator.next();
+ Iterator<CarbonRow> batchIterator = batch.getBatchIterator();
+ int i = 0;
+ while (batchIterator.hasNext()) {
+ CarbonRow row = batchIterator.next();
+ if (row != null) {
+ buffer[i++] = row.getData();
+ }
+ }
+ if (i > 0) {
+ sortDataRows.addRowBatch(buffer, i);
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error(e);
+ throw new CarbonDataLoadingException(e);
+ }
+ return null;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f1f9348d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/IntPointerBuffer.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/IntPointerBuffer.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/IntPointerBuffer.java
new file mode 100644
index 0000000..94674cf
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/IntPointerBuffer.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.newflow.sort.unsafe;
+
+import org.apache.carbondata.core.memory.MemoryBlock;
+
+/**
+ * Holds the pointers for rows.
+ */
+public class IntPointerBuffer {
+
+ private int length;
+
+ private int actualSize;
+
+
+ private int[] pointerBlock;
+
+ private MemoryBlock baseBlock;
+
+ public IntPointerBuffer(MemoryBlock baseBlock) {
+ // TODO can be configurable, it is initial size and it can grow automatically.
+ this.length = 100000;
+ pointerBlock = new int[length];
+ this.baseBlock = baseBlock;
+ }
+
+ public IntPointerBuffer(int length) {
+ this.length = length;
+ pointerBlock = new int[length];
+ }
+
+ public void set(int index, int value) {
+ pointerBlock[index] = value;
+ }
+
+ public void set(int value) {
+ ensureMemory();
+ pointerBlock[actualSize] = value;
+ actualSize++;
+ }
+
+ /**
+ * Returns the value at position {@code index}.
+ */
+ public int get(int index) {
+ assert index >= 0 : "index (" + index + ") should >= 0";
+ assert index < length : "index (" + index + ") should < length (" + length + ")";
+ return pointerBlock[index];
+ }
+
+ public int getActualSize() {
+ return actualSize;
+ }
+
+ public MemoryBlock getBaseBlock() {
+ return baseBlock;
+ }
+
+ public int[] getPointerBlock() {
+ return pointerBlock;
+ }
+
+ private void ensureMemory() {
+ if (actualSize >= length) {
+ // Expand by quarter, may be we can correct the logic later
+ int localLength = length + (int) (length * (0.25));
+ int[] memoryAddress = new int[localLength];
+ System.arraycopy(pointerBlock, 0, memoryAddress, 0, length);
+ pointerBlock = memoryAddress;
+ length = localLength;
+ }
+ }
+
+ public void freeMemory() {
+ pointerBlock = null;
+ if (baseBlock != null) {
+ UnsafeMemoryManager.INSTANCE.freeMemory(baseBlock);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f1f9348d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
new file mode 100644
index 0000000..7296e74
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.newflow.sort.unsafe;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.memory.MemoryBlock;
+import org.apache.carbondata.core.unsafe.CarbonUnsafe;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
+/**
+ * It can keep the data of prescribed size data in offheap/onheap memory and returns it when needed
+ */
+public class UnsafeCarbonRowPage {
+
+ private boolean[] noDictionaryDimensionMapping;
+
+ private int dimensionSize;
+
+ private int measureSize;
+
+ private char[] aggType;
+
+ private long[] nullSetWords;
+
+ private IntPointerBuffer buffer;
+
+ private int lastSize;
+
+ private long sizeToBeUsed;
+
+ private MemoryBlock dataBlock;
+
+ private boolean saveToDisk;
+
+ public UnsafeCarbonRowPage(boolean[] noDictionaryDimensionMapping, int dimensionSize,
+ int measureSize, char[] aggType, MemoryBlock memoryBlock, boolean saveToDisk) {
+ this.noDictionaryDimensionMapping = noDictionaryDimensionMapping;
+ this.dimensionSize = dimensionSize;
+ this.measureSize = measureSize;
+ this.aggType = aggType;
+ this.saveToDisk = saveToDisk;
+ this.nullSetWords = new long[((measureSize - 1) >> 6) + 1];
+ buffer = new IntPointerBuffer(memoryBlock);
+ this.dataBlock = buffer.getBaseBlock();
+ // TODO Only using 98% of space for safe side.May be we can have different logic.
+ sizeToBeUsed = dataBlock.size() - (dataBlock.size() * 5) / 100;
+ }
+
+ public void addRow(Object[] row) {
+ int size = addRow(row, dataBlock.getBaseOffset() + lastSize);
+ buffer.set(lastSize);
+ lastSize = lastSize + size;
+ }
+
+ public Iterator<Object[]> getIterator() {
+ return new UnsafeIterator();
+ }
+
+ private int addRow(Object[] row, long address) {
+ if (row == null) {
+ throw new RuntimeException("Row is null ??");
+ }
+ int dimCount = 0;
+ int size = 0;
+ Object baseObject = dataBlock.getBaseObject();
+ for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) {
+ if (noDictionaryDimensionMapping[dimCount]) {
+ byte[] col = (byte[]) row[dimCount];
+ CarbonUnsafe.unsafe
+ .putShort(baseObject, address + size, (short) col.length);
+ size += 2;
+ CarbonUnsafe.unsafe.copyMemory(col, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
+ address + size, col.length);
+ size += col.length;
+ } else {
+ int value = (int) row[dimCount];
+ CarbonUnsafe.unsafe.putInt(baseObject, address + size, value);
+ size += 4;
+ }
+ }
+
+ // write complex dimensions here.
+ for (; dimCount < dimensionSize; dimCount++) {
+ byte[] col = (byte[]) row[dimCount];
+ CarbonUnsafe.unsafe.putShort(baseObject, address + size, (short) col.length);
+ size += 2;
+ CarbonUnsafe.unsafe.copyMemory(col, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
+ address + size, col.length);
+ size += col.length;
+ }
+ Arrays.fill(nullSetWords, 0);
+ int nullSetSize = nullSetWords.length * 8;
+ int nullWordLoc = size;
+ size += nullSetSize;
+ for (int mesCount = 0; mesCount < measureSize; mesCount++) {
+ Object value = row[mesCount + dimensionSize];
+ if (null != value) {
+ if (aggType[mesCount] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
+ Double val = (Double) value;
+ CarbonUnsafe.unsafe.putDouble(baseObject, address + size, val);
+ size += 8;
+ } else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) {
+ Long val = (Long) value;
+ CarbonUnsafe.unsafe.putLong(baseObject, address + size, val);
+ size += 8;
+ } else if (aggType[mesCount] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
+ BigDecimal val = (BigDecimal) value;
+ byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
+ CarbonUnsafe.unsafe.putShort(baseObject, address + size,
+ (short) bigDecimalInBytes.length);
+ size += 2;
+ CarbonUnsafe.unsafe
+ .copyMemory(bigDecimalInBytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
+ address + size, bigDecimalInBytes.length);
+ size += bigDecimalInBytes.length;
+ }
+ set(nullSetWords, mesCount);
+ } else {
+ unset(nullSetWords, mesCount);
+ }
+ }
+ CarbonUnsafe.unsafe.copyMemory(nullSetWords, CarbonUnsafe.LONG_ARRAY_OFFSET, baseObject,
+ address + nullWordLoc, nullSetSize);
+ return size;
+ }
+
+ public Object[] getRow(long address, Object[] rowToFill) {
+ int dimCount = 0;
+ int size = 0;
+
+ Object baseObject = dataBlock.getBaseObject();
+ for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) {
+ if (noDictionaryDimensionMapping[dimCount]) {
+ short aShort = CarbonUnsafe.unsafe.getShort(baseObject, address + size);
+ byte[] col = new byte[aShort];
+ size += 2;
+ CarbonUnsafe.unsafe
+ .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET,
+ col.length);
+ size += col.length;
+ rowToFill[dimCount] = col;
+ } else {
+ int anInt = CarbonUnsafe.unsafe.getInt(baseObject, address + size);
+ size += 4;
+ rowToFill[dimCount] = anInt;
+ }
+ }
+
+ // write complex dimensions here.
+ for (; dimCount < dimensionSize; dimCount++) {
+ short aShort = CarbonUnsafe.unsafe.getShort(baseObject, address + size);
+ byte[] col = new byte[aShort];
+ size += 2;
+ CarbonUnsafe.unsafe
+ .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET, col.length);
+ size += col.length;
+ rowToFill[dimCount] = col;
+ }
+
+ int nullSetSize = nullSetWords.length * 8;
+ Arrays.fill(nullSetWords, 0);
+ CarbonUnsafe.unsafe
+ .copyMemory(baseObject, address + size, nullSetWords, CarbonUnsafe.LONG_ARRAY_OFFSET,
+ nullSetSize);
+ size += nullSetSize;
+
+ for (int mesCount = 0; mesCount < measureSize; mesCount++) {
+ if (isSet(nullSetWords, mesCount)) {
+ if (aggType[mesCount] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
+ Double val = CarbonUnsafe.unsafe.getDouble(baseObject, address + size);
+ size += 8;
+ rowToFill[dimensionSize + mesCount] = val;
+ } else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) {
+ Long val = CarbonUnsafe.unsafe.getLong(baseObject, address + size);
+ size += 8;
+ rowToFill[dimensionSize + mesCount] = val;
+ } else if (aggType[mesCount] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
+ short aShort = CarbonUnsafe.unsafe.getShort(baseObject, address + size);
+ byte[] bigDecimalInBytes = new byte[aShort];
+ size += 2;
+ CarbonUnsafe.unsafe.copyMemory(baseObject, address + size, bigDecimalInBytes,
+ CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
+ BigDecimal val = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes);
+ size += bigDecimalInBytes.length;
+ rowToFill[dimensionSize + mesCount] = val;
+ }
+ } else {
+ rowToFill[dimensionSize + mesCount] = null;
+ }
+ }
+ return rowToFill;
+ }
+
+ public void fillRow(long address, DataOutputStream stream) throws IOException {
+ int dimCount = 0;
+ int size = 0;
+
+ Object baseObject = dataBlock.getBaseObject();
+ for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) {
+ if (noDictionaryDimensionMapping[dimCount]) {
+ short aShort = CarbonUnsafe.unsafe.getShort(baseObject, address + size);
+ byte[] col = new byte[aShort];
+ size += 2;
+ CarbonUnsafe.unsafe
+ .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET,
+ col.length);
+ size += col.length;
+ stream.writeShort(aShort);
+ stream.write(col);
+ } else {
+ int anInt = CarbonUnsafe.unsafe.getInt(baseObject, address + size);
+ size += 4;
+ stream.writeInt(anInt);
+ }
+ }
+
+ // write complex dimensions here.
+ for (; dimCount < dimensionSize; dimCount++) {
+ short aShort = CarbonUnsafe.unsafe.getShort(baseObject, address + size);
+ byte[] col = new byte[aShort];
+ size += 2;
+ CarbonUnsafe.unsafe
+ .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET, col.length);
+ size += col.length;
+ stream.writeShort(aShort);
+ stream.write(col);
+ }
+
+ int nullSetSize = nullSetWords.length * 8;
+ Arrays.fill(nullSetWords, 0);
+ CarbonUnsafe.unsafe
+ .copyMemory(baseObject, address + size, nullSetWords, CarbonUnsafe.LONG_ARRAY_OFFSET,
+ nullSetSize);
+ size += nullSetSize;
+ for (int i = 0; i < nullSetWords.length; i++) {
+ stream.writeLong(nullSetWords[i]);
+ }
+
+ for (int mesCount = 0; mesCount < measureSize; mesCount++) {
+ if (isSet(nullSetWords, mesCount)) {
+ if (aggType[mesCount] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
+ double val = CarbonUnsafe.unsafe.getDouble(baseObject, address + size);
+ size += 8;
+ stream.writeDouble(val);
+ } else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) {
+ long val = CarbonUnsafe.unsafe.getLong(baseObject, address + size);
+ size += 8;
+ stream.writeLong(val);
+ } else if (aggType[mesCount] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
+ short aShort = CarbonUnsafe.unsafe.getShort(baseObject, address + size);
+ byte[] bigDecimalInBytes = new byte[aShort];
+ size += 2;
+ CarbonUnsafe.unsafe.copyMemory(baseObject, address + size, bigDecimalInBytes,
+ CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
+ size += bigDecimalInBytes.length;
+ stream.writeShort(aShort);
+ stream.write(bigDecimalInBytes);
+ }
+ }
+ }
+ }
+
+ private Object[] getRow(long address) {
+ Object[] row = new Object[dimensionSize + measureSize];
+ return getRow(address, row);
+ }
+
+ public void freeMemory() {
+ buffer.freeMemory();
+ }
+
+ public boolean isSaveToDisk() {
+ return saveToDisk;
+ }
+
+ public IntPointerBuffer getBuffer() {
+ return buffer;
+ }
+
+ public int getUsedSize() {
+ return lastSize;
+ }
+
+ public boolean canAdd() {
+ return lastSize < sizeToBeUsed;
+ }
+
+ public MemoryBlock getDataBlock() {
+ return dataBlock;
+ }
+
+ class UnsafeIterator extends CarbonIterator<Object[]> {
+
+ private int counter;
+
+ private int actualSize;
+
+ public UnsafeIterator() {
+ this.actualSize = buffer.getActualSize();
+ }
+
+ @Override public boolean hasNext() {
+ if (counter < actualSize) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override public Object[] next() {
+ long address = buffer.get(counter);
+ counter++;
+ return getRow(address + dataBlock.getBaseOffset());
+ }
+ }
+
+ public static void set(long[] words, int index) {
+ int wordOffset = (index >> 6);
+ words[wordOffset] |= (1L << index);
+ }
+
+ public static void unset(long[] words, int index) {
+ int wordOffset = (index >> 6);
+ words[wordOffset] &= ~(1L << index);
+ }
+
+ public static boolean isSet(long[] words, int index) {
+ int wordOffset = (index >> 6);
+ return ((words[wordOffset] & (1L << index)) != 0);
+ }
+
+ public boolean[] getNoDictionaryDimensionMapping() {
+ return noDictionaryDimensionMapping;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f1f9348d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java
new file mode 100644
index 0000000..f844fb5
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.newflow.sort.unsafe;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.memory.MemoryAllocator;
+import org.apache.carbondata.core.memory.MemoryBlock;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+/**
+ * Manages memory for instance.
+ */
+public class UnsafeMemoryManager {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(UnsafeMemoryManager.class.getName());
+
+ static {
+ long size = Long.parseLong(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB,
+ CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT));
+
+ boolean offHeap = Boolean.parseBoolean(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+ CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT));
+ long takenSize = size * 1024 * 1024;
+ MemoryAllocator allocator;
+ if (offHeap) {
+ allocator = MemoryAllocator.UNSAFE;
+ } else {
+ long maxMemory = Runtime.getRuntime().maxMemory()*60/100;
+ if (takenSize > maxMemory) {
+ takenSize = maxMemory;
+ }
+ allocator = MemoryAllocator.HEAP;
+ }
+ INSTANCE = new UnsafeMemoryManager(takenSize, allocator);
+ }
+
+ public static final UnsafeMemoryManager INSTANCE;
+
+ private long totalMemory;
+
+ private long memoryUsed;
+
+ private MemoryAllocator allocator;
+
+ private long minimumMemory;
+
+ private UnsafeMemoryManager(long totalMemory, MemoryAllocator allocator) {
+ this.totalMemory = totalMemory;
+ this.allocator = allocator;
+ minimumMemory = (long) (totalMemory * ((double)10/100));
+ LOGGER.audit("Memory manager is created with size " + totalMemory
+ +" with "+allocator +" and minimum reserve memory "+minimumMemory);
+ }
+ public synchronized MemoryBlock allocateMemory(long memoryRequested) {
+ if (memoryUsed + memoryRequested <= totalMemory) {
+ MemoryBlock allocate = allocator.allocate(memoryRequested);
+ memoryUsed += allocate.size();
+ LOGGER.audit("Memory block is created with size " + allocate.size() +
+ " Total memory used " + memoryUsed + " memory left " + (getAvailableMemory()));
+ return allocate;
+ }
+ return null;
+ }
+
+ public synchronized void freeMemory(MemoryBlock memoryBlock) {
+ allocator.free(memoryBlock);
+ memoryUsed -= memoryBlock.size();
+ memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
+ LOGGER.audit("Memory released, memory used "+ memoryUsed
+ + " memory left "+(getAvailableMemory()));
+ }
+
+ public synchronized long getAvailableMemory() {
+ return totalMemory - memoryUsed;
+ }
+
+ public boolean isMemoryAvailable() {
+ return getAvailableMemory() > minimumMemory;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f1f9348d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
new file mode 100644
index 0000000..9d73ba2
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.processing.newflow.sort.unsafe;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.memory.MemoryBlock;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.processing.newflow.sort.unsafe.comparator.UnsafeRowComparator;
+import org.apache.carbondata.processing.newflow.sort.unsafe.comparator.UnsafeRowComparatorForNormalDIms;
+import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeCarbonRow;
+import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeIntermediateMerger;
+import org.apache.carbondata.processing.newflow.sort.unsafe.sort.TimSort;
+import org.apache.carbondata.processing.newflow.sort.unsafe.sort.UnsafeIntSortDataFormat;
+import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
+import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
+import org.apache.carbondata.processing.sortandgroupby.sortdata.SortTempFileChunkWriter;
+import org.apache.carbondata.processing.sortandgroupby.sortdata.TempSortFileWriter;
+import org.apache.carbondata.processing.sortandgroupby.sortdata.TempSortFileWriterFactory;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+public class UnsafeSortDataRows {
+ /**
+ * LOGGER
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(UnsafeSortDataRows.class.getName());
+ /**
+ * threadStatusObserver
+ */
+ private ThreadStatusObserver threadStatusObserver;
+ /**
+ * executor service for data sort holder
+ */
+ private ExecutorService dataSorterAndWriterExecutorService;
+ /**
+ * semaphore which will used for managing sorted data object arrays
+ */
+
+ private SortParameters parameters;
+
+ private UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger;
+
+ private UnsafeCarbonRowPage rowPage;
+
+ private final Object addRowsLock = new Object();
+
+ private int inMemoryChunkSizeInMB;
+
+ private boolean enableInMemoryIntermediateMerge;
+
+ public UnsafeSortDataRows(SortParameters parameters,
+ UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger) {
+ this.parameters = parameters;
+
+ this.unsafeInMemoryIntermediateFileMerger = unsafeInMemoryIntermediateFileMerger;
+
+ // observer of writing file in thread
+ this.threadStatusObserver = new ThreadStatusObserver();
+
+ this.inMemoryChunkSizeInMB = Integer.parseInt(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.OFFHEAP_SORT_CHUNK_SIZE_IN_MB,
+ CarbonCommonConstants.OFFHEAP_SORT_CHUNK_SIZE_IN_MB_DEFAULT));
+ enableInMemoryIntermediateMerge = Boolean.parseBoolean(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT,
+ CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT_DEFAULT));
+ }
+
+ /**
+ * This method will be used to initialize
+ */
+ public void initialize() throws CarbonSortKeyAndGroupByException {
+ MemoryBlock baseBlock = getMemoryBlock(inMemoryChunkSizeInMB * 1024 * 1024);
+ this.rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(),
+ parameters.getDimColCount(), parameters.getMeasureColCount(), parameters.getAggType(),
+ baseBlock, !UnsafeMemoryManager.INSTANCE.isMemoryAvailable());
+ // Delete if any older file exists in sort temp folder
+ deleteSortLocationIfExists();
+
+ // create new sort temp directory
+ if (!new File(parameters.getTempFileLocation()).mkdirs()) {
+ LOGGER.info("Sort Temp Location Already Exists");
+ }
+ this.dataSorterAndWriterExecutorService =
+ Executors.newFixedThreadPool(parameters.getNumberOfCores());
+ }
+
+ public static MemoryBlock getMemoryBlock(long size) throws CarbonSortKeyAndGroupByException {
+ MemoryBlock baseBlock = null;
+ int tries = 0;
+ while (true && tries < 100) {
+ baseBlock = UnsafeMemoryManager.INSTANCE.allocateMemory(size);
+ if (baseBlock == null) {
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ throw new CarbonSortKeyAndGroupByException(e);
+ }
+ } else {
+ break;
+ }
+ tries++;
+ }
+ if (baseBlock == null) {
+ throw new CarbonSortKeyAndGroupByException("Not enough memory to create page");
+ }
+ return baseBlock;
+ }
+
+ /**
+ * This method will be used to add new row
+ *
+ * @param rowBatch new rowBatch
+ * @throws CarbonSortKeyAndGroupByException problem while writing
+ */
+ public void addRowBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException {
+ // if record holder list size is equal to sort buffer size then it will
+ // sort the list and then write current list data to file
+ synchronized (addRowsLock) {
+ for (int i = 0; i < size; i++) {
+ if (rowPage.canAdd()) {
+ rowPage.addRow(rowBatch[i]);
+ } else {
+ try {
+ if (enableInMemoryIntermediateMerge) {
+ unsafeInMemoryIntermediateFileMerger.startInmemoryMergingIfPossible();
+ }
+ unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
+ dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage));
+ MemoryBlock memoryBlock = getMemoryBlock(inMemoryChunkSizeInMB * 1024 * 1024);
+ boolean saveToDisk = !UnsafeMemoryManager.INSTANCE.isMemoryAvailable();
+ rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(),
+ parameters.getDimColCount(), parameters.getMeasureColCount(),
+ parameters.getAggType(), memoryBlock,
+ saveToDisk);
+ rowPage.addRow(rowBatch[i]);
+ } catch (Exception e) {
+ LOGGER.error(
+ "exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
+ throw new CarbonSortKeyAndGroupByException(e);
+ }
+
+ }
+ }
+ }
+ }
+
+ /**
+ * Below method will be used to start storing process This method will get
+ * all the temp files present in sort temp folder then it will create the
+ * record holder heap and then it will read first record from each file and
+ * initialize the heap
+ *
+ * @throws CarbonSortKeyAndGroupByException
+ */
+ public void startSorting() throws CarbonSortKeyAndGroupByException {
+ LOGGER.info("Unsafe based sorting will be used");
+ if (this.rowPage.getUsedSize() > 0) {
+ TimSort<UnsafeCarbonRow, IntPointerBuffer> timSort = new TimSort<>(
+ new UnsafeIntSortDataFormat(rowPage));
+ if (parameters.getNoDictionaryCount() > 0) {
+ timSort.sort(rowPage.getBuffer(), 0, rowPage.getBuffer().getActualSize(),
+ new UnsafeRowComparator(rowPage));
+ } else {
+ timSort.sort(rowPage.getBuffer(), 0, rowPage.getBuffer().getActualSize(),
+ new UnsafeRowComparatorForNormalDIms(parameters.getDimColCount(), rowPage));
+ }
+ unsafeInMemoryIntermediateFileMerger.addDataChunkToMerge(rowPage);
+ } else {
+ rowPage.freeMemory();
+ }
+ startFileBasedMerge();
+ }
+
+ private void writeSortTempFile(Object[][] recordHolderList, int entryCountLocal, File file)
+ throws CarbonSortKeyAndGroupByException {
+ TempSortFileWriter writer = null;
+
+ try {
+ writer = getWriter();
+ writer.initiaize(file, entryCountLocal);
+ writer.writeSortTempFile(recordHolderList);
+ } catch (CarbonSortKeyAndGroupByException e) {
+ LOGGER.error(e, "Problem while writing the sort temp file");
+ throw e;
+ } finally {
+ if (writer != null) {
+ writer.finish();
+ }
+ }
+ }
+
+ private void writeData(UnsafeCarbonRowPage rowPage, File file)
+ throws CarbonSortKeyAndGroupByException {
+ DataOutputStream stream = null;
+ try {
+ // open stream
+ stream = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file),
+ parameters.getFileWriteBufferSize()));
+ int actualSize = rowPage.getBuffer().getActualSize();
+ // write number of entries to the file
+ stream.writeInt(actualSize);
+ for (int i = 0; i < actualSize; i++) {
+ rowPage.fillRow(rowPage.getBuffer().get(i) + rowPage.getDataBlock().getBaseOffset(),
+ stream);
+ }
+
+ } catch (IOException e) {
+ throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e);
+ } finally {
+ // close streams
+ CarbonUtil.closeStreams(stream);
+ }
+ }
+
+ private TempSortFileWriter getWriter() {
+ TempSortFileWriter chunkWriter = null;
+ TempSortFileWriter writer = TempSortFileWriterFactory.getInstance()
+ .getTempSortFileWriter(parameters.isSortFileCompressionEnabled(),
+ parameters.getDimColCount(), parameters.getComplexDimColCount(),
+ parameters.getMeasureColCount(), parameters.getNoDictionaryCount(),
+ parameters.getFileWriteBufferSize());
+
+ if (parameters.isPrefetch() && !parameters.isSortFileCompressionEnabled()) {
+ chunkWriter = new SortTempFileChunkWriter(writer, parameters.getBufferSize());
+ } else {
+ chunkWriter =
+ new SortTempFileChunkWriter(writer, parameters.getSortTempFileNoOFRecordsInCompression());
+ }
+
+ return chunkWriter;
+ }
+
+ /**
+ * This method will be used to delete sort temp location is it is exites
+ *
+ * @throws CarbonSortKeyAndGroupByException
+ */
+ public void deleteSortLocationIfExists() throws CarbonSortKeyAndGroupByException {
+ CarbonDataProcessorUtil.deleteSortLocationIfExists(parameters.getTempFileLocation());
+ }
+
+ /**
+ * Below method will be used to start file based merge
+ *
+ * @throws CarbonSortKeyAndGroupByException
+ */
+ private void startFileBasedMerge() throws CarbonSortKeyAndGroupByException {
+ try {
+ dataSorterAndWriterExecutorService.shutdown();
+ dataSorterAndWriterExecutorService.awaitTermination(2, TimeUnit.DAYS);
+ } catch (InterruptedException e) {
+ throw new CarbonSortKeyAndGroupByException("Problem while shutdown the server ", e);
+ }
+ }
+
+ /**
+ * Observer class for thread execution
+ * In case of any failure we need stop all the running thread
+ */
+ private class ThreadStatusObserver {
+ /**
+ * Below method will be called if any thread fails during execution
+ *
+ * @param exception
+ * @throws CarbonSortKeyAndGroupByException
+ */
+ public void notifyFailed(Throwable exception) throws CarbonSortKeyAndGroupByException {
+ dataSorterAndWriterExecutorService.shutdownNow();
+ unsafeInMemoryIntermediateFileMerger.close();
+ parameters.getObserver().setFailed(true);
+ LOGGER.error(exception);
+ throw new CarbonSortKeyAndGroupByException(exception);
+ }
+ }
+
+ /**
+ * This class is responsible for sorting and writing the object
+ * array which holds the records equal to given array size
+ */
+ private class DataSorterAndWriter implements Callable<Void> {
+ private UnsafeCarbonRowPage page;
+
+ public DataSorterAndWriter(UnsafeCarbonRowPage rowPage) {
+ this.page = rowPage;
+ }
+
+ @Override public Void call() throws Exception {
+ try {
+ long startTime = System.currentTimeMillis();
+ TimSort<UnsafeCarbonRow, IntPointerBuffer> timSort = new TimSort<>(
+ new UnsafeIntSortDataFormat(page));
+ if (parameters.getNoDictionaryCount() > 0) {
+ timSort.sort(page.getBuffer(), 0, page.getBuffer().getActualSize(),
+ new UnsafeRowComparator(page));
+ } else {
+ timSort.sort(page.getBuffer(), 0, page.getBuffer().getActualSize(),
+ new UnsafeRowComparatorForNormalDIms(parameters.getDimColCount(), page));
+ }
+ if (rowPage.isSaveToDisk()) {
+ // create a new file every time
+ File sortTempFile = new File(
+ parameters.getTempFileLocation() + File.separator + parameters.getTableName() + System
+ .nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
+ writeData(page, sortTempFile);
+ LOGGER.info("Time taken to sort row page with size" + page.getBuffer().getActualSize()
+ + " and write is: " + (System.currentTimeMillis() - startTime));
+ page.freeMemory();
+ // add sort temp filename to and arrayList. When the list size reaches 20 then
+ // intermediate merging of sort temp files will be triggered
+ unsafeInMemoryIntermediateFileMerger.addFileToMerge(sortTempFile);
+ } else {
+ // add sort temp filename to and arrayList. When the list size reaches 20 then
+ // intermediate merging of sort temp files will be triggered
+ unsafeInMemoryIntermediateFileMerger.addDataChunkToMerge(page);
+ LOGGER.info(
+ "Time taken to sort row page with size" + page.getBuffer().getActualSize() + "is: "
+ + (System.currentTimeMillis() - startTime));
+ }
+ } catch (Throwable e) {
+ threadStatusObserver.notifyFailed(e);
+ }
+ return null;
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f1f9348d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java
new file mode 100644
index 0000000..9ab2bdd
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.processing.newflow.sort.unsafe.comparator;
+
+import java.util.Comparator;
+
+import org.apache.carbondata.core.unsafe.CarbonUnsafe;
+import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeCarbonRow;
+
+public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> {
+
+ /**
+ * noDictionaryColMaping mapping of dictionary dimensions and no dictionary dimensions.
+ */
+ private boolean[] noDictionaryColMaping;
+
+ private Object baseObject;
+
+ public UnsafeRowComparator(UnsafeCarbonRowPage rowPage) {
+ this.noDictionaryColMaping = rowPage.getNoDictionaryDimensionMapping();
+ this.baseObject = rowPage.getDataBlock().getBaseObject();
+ }
+
+ /**
+ * Below method will be used to compare two mdkey
+ */
+ public int compare(UnsafeCarbonRow rowL, UnsafeCarbonRow rowR) {
+ int diff = 0;
+ long rowA = rowL.address;
+ long rowB = rowR.address;
+ int sizeA = 0;
+ int sizeB = 0;
+ for (boolean isNoDictionary : noDictionaryColMaping) {
+ if (isNoDictionary) {
+ short aShort1 = CarbonUnsafe.unsafe.getShort(baseObject, rowA + sizeA);
+ byte[] byteArr1 = new byte[aShort1];
+ sizeA += 2;
+ CarbonUnsafe.unsafe.copyMemory(baseObject, rowA + sizeA, byteArr1,
+ CarbonUnsafe.BYTE_ARRAY_OFFSET, aShort1);
+ sizeA += aShort1;
+
+ short aShort2 = CarbonUnsafe.unsafe.getShort(baseObject, rowB + sizeB);
+ byte[] byteArr2 = new byte[aShort2];
+ sizeB += 2;
+ CarbonUnsafe.unsafe.copyMemory(baseObject, rowB + sizeB, byteArr2,
+ CarbonUnsafe.BYTE_ARRAY_OFFSET, aShort2);
+ sizeB += aShort2;
+
+ int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
+ if (difference != 0) {
+ return difference;
+ }
+ } else {
+ int dimFieldA = CarbonUnsafe.unsafe.getInt(baseObject, rowA + sizeA);
+ sizeA += 4;
+ int dimFieldB = CarbonUnsafe.unsafe.getInt(baseObject, rowB + sizeB);
+ sizeB += 4;
+ diff = dimFieldA - dimFieldB;
+ if (diff != 0) {
+ return diff;
+ }
+ }
+ }
+
+ return diff;
+ }
+
+ /**
+ * Below method will be used to compare two mdkey
+ */
+ public int compare(UnsafeCarbonRow rowL, Object baseObjectL, UnsafeCarbonRow rowR,
+ Object baseObjectR) {
+ int diff = 0;
+ long rowA = rowL.address;
+ long rowB = rowR.address;
+ int sizeA = 0;
+ int sizeB = 0;
+ for (boolean isNoDictionary : noDictionaryColMaping) {
+ if (isNoDictionary) {
+ short aShort1 = CarbonUnsafe.unsafe.getShort(baseObjectL, rowA + sizeA);
+ byte[] byteArr1 = new byte[aShort1];
+ sizeA += 2;
+ CarbonUnsafe.unsafe
+ .copyMemory(baseObjectL, rowA + sizeA, byteArr1, CarbonUnsafe.BYTE_ARRAY_OFFSET,
+ aShort1);
+ sizeA += aShort1;
+
+ short aShort2 = CarbonUnsafe.unsafe.getShort(baseObjectR, rowB + sizeB);
+ byte[] byteArr2 = new byte[aShort2];
+ sizeB += 2;
+ CarbonUnsafe.unsafe
+ .copyMemory(baseObjectR, rowB + sizeB, byteArr2, CarbonUnsafe.BYTE_ARRAY_OFFSET,
+ aShort2);
+ sizeB += aShort2;
+
+ int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
+ if (difference != 0) {
+ return difference;
+ }
+ } else {
+ int dimFieldA = CarbonUnsafe.unsafe.getInt(baseObjectL, rowA + sizeA);
+ sizeA += 4;
+ int dimFieldB = CarbonUnsafe.unsafe.getInt(baseObjectR, rowB + sizeB);
+ sizeB += 4;
+ diff = dimFieldA - dimFieldB;
+ if (diff != 0) {
+ return diff;
+ }
+ }
+ }
+
+ return diff;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f1f9348d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
new file mode 100644
index 0000000..8794753
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.processing.newflow.sort.unsafe.comparator;
+
+import java.util.Comparator;
+
+import org.apache.carbondata.core.unsafe.CarbonUnsafe;
+import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeCarbonRow;
+
+public class UnsafeRowComparatorForNormalDIms implements Comparator<UnsafeCarbonRow> {
+
+ private Object baseObject;
+
+ private int dimCount;
+
+ public UnsafeRowComparatorForNormalDIms(int dimCount, UnsafeCarbonRowPage rowPage) {
+ this.baseObject = rowPage.getDataBlock().getBaseObject();
+ this.dimCount = dimCount;
+ }
+
+ /**
+ * Below method will be used to compare two mdkey
+ */
+ public int compare(UnsafeCarbonRow rowL, UnsafeCarbonRow rowR) {
+ int diff = 0;
+ long rowA = rowL.address;
+ long rowB = rowR.address;
+ int sizeA = 0;
+ int sizeB = 0;
+ for (int i = 0; i < dimCount; i++) {
+ int dimFieldA = CarbonUnsafe.unsafe.getInt(baseObject, rowA + sizeA);
+ sizeA += 4;
+ int dimFieldB = CarbonUnsafe.unsafe.getInt(baseObject, rowB + sizeB);
+ sizeB += 4;
+ diff = dimFieldA - dimFieldB;
+ if (diff != 0) {
+ return diff;
+ }
+ }
+
+ return diff;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f1f9348d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/SortTempChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/SortTempChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/SortTempChunkHolder.java
new file mode 100644
index 0000000..8d199e2
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/SortTempChunkHolder.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.newflow.sort.unsafe.holder;
+
+import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
+
+/**
+ * Interface for merging temporary sort files/ inmemory data
+ */
+public interface SortTempChunkHolder extends Comparable<SortTempChunkHolder> {
+
+ boolean hasNext();
+
+ void readRow() throws CarbonSortKeyAndGroupByException;
+
+ Object[] getRow();
+
+ int numberOfRows();
+
+ void close();
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f1f9348d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeCarbonRow.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeCarbonRow.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeCarbonRow.java
new file mode 100644
index 0000000..b71abf1
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeCarbonRow.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.newflow.sort.unsafe.holder;
+
+public class UnsafeCarbonRow {
+
+ public long address;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f1f9348d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeCarbonRowForMerge.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeCarbonRowForMerge.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeCarbonRowForMerge.java
new file mode 100644
index 0000000..3534b6d
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeCarbonRowForMerge.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.newflow.sort.unsafe.holder;
+
+public class UnsafeCarbonRowForMerge extends UnsafeCarbonRow {
+
+ public byte index;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f1f9348d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
new file mode 100644
index 0000000..5bce745
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.processing.newflow.sort.unsafe.holder;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeInMemoryIntermediateDataMerger;
+import org.apache.carbondata.processing.sortandgroupby.sortdata.NewRowComparator;
+
+public class UnsafeFinalMergePageHolder implements SortTempChunkHolder {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(UnsafeFinalMergePageHolder.class.getName());
+
+ private int counter;
+
+ private int actualSize;
+
+ private long[] mergedAddresses;
+
+ private byte[] rowPageIndexes;
+
+ private UnsafeCarbonRowPage[] rowPages;
+
+ private NewRowComparator comparator;
+
+ private Object[] currentRow;
+
+ private int columnSize;
+
+ public UnsafeFinalMergePageHolder(UnsafeInMemoryIntermediateDataMerger merger,
+ boolean[] noDictMapping, int columnSize) {
+ this.actualSize = merger.getEntryCount();
+ this.mergedAddresses = merger.getMergedAddresses();
+ this.rowPageIndexes = merger.getRowPageIndexes();
+ this.rowPages = merger.getUnsafeCarbonRowPages();
+ LOGGER.audit("Processing unsafe inmemory rows page with size : " + actualSize);
+ this.comparator = new NewRowComparator(noDictMapping);
+ this.columnSize = columnSize;
+ }
+
+ public boolean hasNext() {
+ if (counter < actualSize) {
+ return true;
+ }
+ return false;
+ }
+
+ public void readRow() {
+ currentRow = new Object[columnSize];
+ rowPages[rowPageIndexes[counter]].getRow(mergedAddresses[counter], currentRow);
+ counter++;
+ }
+
+ public Object[] getRow() {
+ return currentRow;
+ }
+
+ @Override public int compareTo(SortTempChunkHolder o) {
+ return comparator.compare(currentRow, o.getRow());
+ }
+
+ public int numberOfRows() {
+ return actualSize;
+ }
+
+ public void close() {
+ for (int i = 0; i < rowPages.length; i++) {
+ rowPages[i].freeMemory();
+ }
+ }
+}