You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/07/20 12:44:32 UTC
carbondata git commit: [CARBONDATA-1318]Fixed Concurrent table data
loading unsafe memory issue
Repository: carbondata
Updated Branches:
refs/heads/master 69c634c5f -> df22368d9
[CARBONDATA-1318]Fixed Concurrent table data loading unsafe memory issue
Fixed task cancellation leak issue
Fixed task cleanup issue in data loading
Fixed Concurrent table data loading unsafe memory issue
@CarbonDataQA
This closes#1185
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/df22368d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/df22368d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/df22368d
Branch: refs/heads/master
Commit: df22368d98f0390cc7f9c1289a81257adf4509a6
Parents: 69c634c
Author: kumarvishal <ku...@gmail.com>
Authored: Wed Jul 19 13:42:52 2017 +0530
Committer: Ravindra Pesala <ra...@gmail.com>
Committed: Thu Jul 20 18:12:52 2017 +0530
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 4 +-
.../page/UnsafeFixLengthColumnPage.java | 14 +-
.../page/UnsafeVarLengthColumnPage.java | 13 +-
.../core/memory/IntPointerBuffer.java | 42 ++--
.../core/memory/UnsafeMemoryManager.java | 99 +++++---
.../core/memory/UnsafeSortMemoryManager.java | 249 +++++++++++++++++++
.../executor/impl/AbstractQueryExecutor.java | 3 +
.../carbondata/core/util/CarbonTaskInfo.java | 40 +++
.../core/util/ThreadLocalTaskInfo.java | 33 +++
.../apache/carbondata/spark/rdd/CarbonRDD.scala | 5 +-
.../spark/rdd/NewCarbonDataLoadRDD.scala | 24 +-
.../carbondata/spark/util/CommonUtil.scala | 23 +-
.../processing/csvload/CSVInputFormat.java | 3 +
.../processing/newflow/DataLoadExecutor.java | 19 +-
.../sort/unsafe/UnsafeCarbonRowPage.java | 32 ++-
.../newflow/sort/unsafe/UnsafeSortDataRows.java | 54 +++-
16 files changed, 554 insertions(+), 103 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index ccb6344..dfc2153 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1184,9 +1184,11 @@ public final class CarbonCommonConstants {
public static final String OFFHEAP_SORT_CHUNK_SIZE_IN_MB_DEFAULT = "64";
@CarbonProperty
public static final String IN_MEMORY_FOR_SORT_DATA_IN_MB = "sort.inmemory.size.inmb";
-
public static final String IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT = "1024";
+ @CarbonProperty
+ public static final String UNSAFE_WORKING_MEMORY_IN_MB = "carbon.unsafe.working.memory.in.mb";
+ public static final String UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT = "512";
/**
* Sorts the data in batches and writes the batch data to store with index file.
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
index e76c2c4..5dcc685 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.memory.UnsafeMemoryManager;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
import static org.apache.carbondata.core.metadata.datatype.DataType.BYTE;
@@ -42,6 +43,8 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
// base offset of memoryBlock
private long baseOffset;
+ private final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
+
private static final int byteBits = BYTE.getSizeBits();
private static final int shortBits = DataType.SHORT.getSizeBits();
private static final int intBits = DataType.INT.getSizeBits();
@@ -59,13 +62,13 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
case FLOAT:
case DOUBLE:
int size = pageSize << dataType.getSizeBits();
- memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(size);
+ memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size);
baseAddress = memoryBlock.getBaseObject();
baseOffset = memoryBlock.getBaseOffset();
break;
case SHORT_INT:
size = pageSize * 3;
- memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(size);
+ memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size);
baseAddress = memoryBlock.getBaseObject();
baseOffset = memoryBlock.getBaseOffset();
break;
@@ -302,7 +305,7 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
public void freeMemory() {
if (memoryBlock != null) {
- UnsafeMemoryManager.INSTANCE.freeMemory(memoryBlock);
+ UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
memoryBlock = null;
baseAddress = null;
baseOffset = 0;
@@ -360,13 +363,14 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
// use raw compression and copy to byte[]
int inputSize = pageSize * dataType.getSizeInBytes();
int compressedMaxSize = compressor.maxCompressedLength(inputSize);
- MemoryBlock compressed = UnsafeMemoryManager.allocateMemoryWithRetry(compressedMaxSize);
+ MemoryBlock compressed =
+ UnsafeMemoryManager.allocateMemoryWithRetry(taskId, compressedMaxSize);
long outSize = compressor.rawCompress(baseOffset, inputSize, compressed.getBaseOffset());
assert outSize < Integer.MAX_VALUE;
byte[] output = new byte[(int) outSize];
CarbonUnsafe.unsafe.copyMemory(compressed.getBaseObject(), compressed.getBaseOffset(), output,
CarbonUnsafe.BYTE_ARRAY_OFFSET, outSize);
- UnsafeMemoryManager.INSTANCE.freeMemory(compressed);
+ UnsafeMemoryManager.INSTANCE.freeMemory(taskId, compressed);
return output;
} else {
return super.compress(compressor);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
index dd6abc5..0cd64db 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.memory.UnsafeMemoryManager;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
// This extension uses unsafe memory to store page data, for variable length data type (string,
// decimal)
@@ -47,6 +48,8 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
private static final double FACTOR = 1.25;
+ private final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
+
/**
* create a page
* @param dataType data type
@@ -55,7 +58,7 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
UnsafeVarLengthColumnPage(DataType dataType, int pageSize) throws MemoryException {
super(dataType, pageSize);
capacity = (int) (pageSize * DEFAULT_ROW_SIZE * FACTOR);
- memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry((long)(capacity));
+ memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long)(capacity));
baseAddress = memoryBlock.getBaseObject();
baseOffset = memoryBlock.getBaseOffset();
}
@@ -69,7 +72,7 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
UnsafeVarLengthColumnPage(DataType dataType, int pageSize, int capacity) throws MemoryException {
super(dataType, pageSize);
this.capacity = capacity;
- memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry((long)(capacity));
+ memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long)(capacity));
baseAddress = memoryBlock.getBaseObject();
baseOffset = memoryBlock.getBaseOffset();
}
@@ -77,7 +80,7 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
@Override
public void freeMemory() {
if (memoryBlock != null) {
- UnsafeMemoryManager.INSTANCE.freeMemory(memoryBlock);
+ UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
memoryBlock = null;
baseAddress = null;
baseOffset = 0;
@@ -90,10 +93,10 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
private void ensureMemory(int requestSize) throws MemoryException {
if (totalLength + requestSize > capacity) {
int newSize = 2 * capacity;
- MemoryBlock newBlock = UnsafeMemoryManager.allocateMemoryWithRetry(newSize);
+ MemoryBlock newBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, newSize);
CarbonUnsafe.unsafe.copyMemory(baseAddress, baseOffset,
newBlock.getBaseObject(), newBlock.getBaseOffset(), capacity);
- UnsafeMemoryManager.INSTANCE.freeMemory(memoryBlock);
+ UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
memoryBlock = newBlock;
baseAddress = newBlock.getBaseObject();
baseOffset = newBlock.getBaseOffset();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java b/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java
index 0d604fd..dadb1e4 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java
@@ -17,26 +17,32 @@
package org.apache.carbondata.core.memory;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+
/**
* Holds the pointers for rows.
*/
public class IntPointerBuffer {
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(IntPointerBuffer.class.getName());
+
private int length;
private int actualSize;
private int[] pointerBlock;
- private MemoryBlock baseBlock;
-
private MemoryBlock pointerMemoryBlock;
- public IntPointerBuffer(MemoryBlock baseBlock) {
+ private long taskId;
+
+ public IntPointerBuffer(long taskId) {
// TODO can be configurable, it is initial size and it can grow automatically.
this.length = 100000;
pointerBlock = new int[length];
- this.baseBlock = baseBlock;
+ this.taskId = taskId;
}
public IntPointerBuffer(int length) {
@@ -67,24 +73,25 @@ public class IntPointerBuffer {
return pointerBlock[rowId];
}
- public void loadToUnsafe() throws MemoryException {
- pointerMemoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(pointerBlock.length * 4);
- for (int i = 0; i < pointerBlock.length; i++) {
- CarbonUnsafe.unsafe
- .putInt(pointerMemoryBlock.getBaseObject(), pointerMemoryBlock.getBaseOffset() + i * 4,
- pointerBlock[i]);
+ public void loadToUnsafe() {
+ try {
+ pointerMemoryBlock =
+ UnsafeSortMemoryManager.allocateMemoryWithRetry(this.taskId, pointerBlock.length * 4);
+ for (int i = 0; i < pointerBlock.length; i++) {
+ CarbonUnsafe.unsafe
+ .putInt(pointerMemoryBlock.getBaseObject(), pointerMemoryBlock.getBaseOffset() + i * 4,
+ pointerBlock[i]);
+ }
+ pointerBlock = null;
+ } catch (MemoryException e) {
+ LOGGER.warn("Not enough memory for allocating pointer buffer, sorting in heap");
}
- pointerBlock = null;
}
public int getActualSize() {
return actualSize;
}
- public MemoryBlock getBaseBlock() {
- return baseBlock;
- }
-
public int[] getPointerBlock() {
return pointerBlock;
}
@@ -103,10 +110,7 @@ public class IntPointerBuffer {
public void freeMemory() {
pointerBlock = null;
if (pointerMemoryBlock != null) {
- UnsafeMemoryManager.INSTANCE.freeMemory(pointerMemoryBlock);
- }
- if (baseBlock != null) {
- UnsafeMemoryManager.INSTANCE.freeMemory(baseBlock);
+ UnsafeSortMemoryManager.INSTANCE.freeMemory(this.taskId, pointerMemoryBlock);
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
index 28e63a9..991bc90 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
@@ -17,7 +17,10 @@
package org.apache.carbondata.core.memory;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
import java.util.Set;
import org.apache.carbondata.common.logging.LogService;
@@ -36,23 +39,23 @@ public class UnsafeMemoryManager {
private static boolean offHeap = Boolean.parseBoolean(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT));
+ private static Map<Long,Set<MemoryBlock>> taskIdToMemoryBlockMap;
static {
long size;
try {
size = Long.parseLong(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB,
- CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT));
+ .getProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB,
+ CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT));
} catch (Exception e) {
size = Long.parseLong(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT);
LOGGER.info("Wrong memory size given, "
+ "so setting default value to " + size);
}
- if (size < 1024) {
- size = 1024;
- LOGGER.info("It is not recommended to keep unsafe memory size less than 1024MB, "
+ if (size < 512) {
+ size = 512;
+ LOGGER.info("It is not recommended to keep unsafe memory size less than 512MB, "
+ "so setting default value to " + size);
}
-
long takenSize = size * 1024 * 1024;
MemoryAllocator allocator;
if (offHeap) {
@@ -65,6 +68,7 @@ public class UnsafeMemoryManager {
allocator = MemoryAllocator.HEAP;
}
INSTANCE = new UnsafeMemoryManager(takenSize, allocator);
+ taskIdToMemoryBlockMap = new HashMap<>();
}
public static final UnsafeMemoryManager INSTANCE;
@@ -75,76 +79,91 @@ public class UnsafeMemoryManager {
private MemoryAllocator allocator;
- private long minimumMemory;
-
- // for debug purpose
- private Set<MemoryBlock> set = new HashSet<>();
-
private UnsafeMemoryManager(long totalMemory, MemoryAllocator allocator) {
this.totalMemory = totalMemory;
this.allocator = allocator;
- long numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
- long sortMemoryChunkSize = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
- sortMemoryChunkSize = sortMemoryChunkSize * 1024 * 1024;
- long totalWorkingMemoryForAllThreads = sortMemoryChunkSize * numberOfCores;
- if (totalWorkingMemoryForAllThreads >= totalMemory) {
- throw new RuntimeException("Working memory should be less than total memory configured, "
- + "so either reduce the loading threads or increase the memory size. "
- + "(Number of threads * number of threads) should be less than total unsafe memory");
- }
- minimumMemory = totalWorkingMemoryForAllThreads;
- LOGGER.info("Memory manager is created with size " + totalMemory + " with " + allocator
- + " and minimum reserve memory " + minimumMemory);
+ LOGGER
+ .info("Working Memory manager is created with size " + totalMemory + " with " + allocator);
}
- private synchronized MemoryBlock allocateMemory(long memoryRequested) {
+ private synchronized MemoryBlock allocateMemory(long taskId, long memoryRequested) {
if (memoryUsed + memoryRequested <= totalMemory) {
MemoryBlock allocate = allocator.allocate(memoryRequested);
memoryUsed += allocate.size();
if (LOGGER.isDebugEnabled()) {
- set.add(allocate);
- LOGGER.error("Memory block (" + allocate + ") is created with size " + allocate.size() +
- ". Total memory used " + memoryUsed + "Bytes, left " + getAvailableMemory() + "Bytes");
+ LOGGER.debug(
+ "Working Memory block (" + allocate + ") is created with size " + allocate.size()
+ + ". Total memory used " + memoryUsed + "Bytes, left " + (totalMemory - memoryUsed)
+ + "Bytes");
+ }
+ Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId);
+ if (null == listOfMemoryBlock) {
+ listOfMemoryBlock = new HashSet<>();
+ taskIdToMemoryBlockMap.put(taskId, listOfMemoryBlock);
}
+ listOfMemoryBlock.add(allocate);
return allocate;
}
return null;
}
- public synchronized void freeMemory(MemoryBlock memoryBlock) {
+ public synchronized void freeMemory(long taskId,MemoryBlock memoryBlock) {
+ taskIdToMemoryBlockMap.get(taskId).remove(memoryBlock);
allocator.free(memoryBlock);
memoryUsed -= memoryBlock.size();
memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
if (LOGGER.isDebugEnabled()) {
- set.remove(memoryBlock);
- LOGGER.error("Memory block (" + memoryBlock + ") released. Total memory used " + memoryUsed +
- "Bytes, left " + getAvailableMemory() + "Bytes. Total allocated block: " + set.size());
+ LOGGER.debug(
+ "Freeing memory of size: " + memoryBlock.size() + ": Current available memory is: " + (
+ totalMemory - memoryUsed));
}
}
- private synchronized long getAvailableMemory() {
- return totalMemory - memoryUsed;
+ public void freeMemoryAll(long taskId) {
+ Set<MemoryBlock> memoryBlockSet = null;
+ synchronized (INSTANCE) {
+ memoryBlockSet = taskIdToMemoryBlockMap.remove(taskId);
+ }
+ long occuppiedMemory = 0;
+ if (null != memoryBlockSet) {
+ Iterator<MemoryBlock> iterator = memoryBlockSet.iterator();
+ MemoryBlock memoryBlock = null;
+ while (iterator.hasNext()) {
+ memoryBlock = iterator.next();
+ occuppiedMemory += memoryBlock.size();
+ allocator.free(memoryBlock);
+ }
+ }
+ synchronized (INSTANCE) {
+ memoryUsed -= occuppiedMemory;
+ memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
+ }
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "Freeing memory of size: " + occuppiedMemory + ": Current available memory is: " + (
+ totalMemory - memoryUsed));
+ }
}
- public boolean isMemoryAvailable() {
- return getAvailableMemory() > minimumMemory;
+ public synchronized boolean isMemoryAvailable() {
+ return memoryUsed > totalMemory;
}
public long getUsableMemory() {
- return totalMemory - minimumMemory;
+ return totalMemory;
}
/**
* It tries to allocate memory of `size` bytes, keep retry until it allocates successfully.
*/
- public static MemoryBlock allocateMemoryWithRetry(long size) throws MemoryException {
+ public static MemoryBlock allocateMemoryWithRetry(long taskId, long size) throws MemoryException {
MemoryBlock baseBlock = null;
int tries = 0;
- while (tries < 100) {
- baseBlock = INSTANCE.allocateMemory(size);
+ while (tries < 300) {
+ baseBlock = INSTANCE.allocateMemory(taskId, size);
if (baseBlock == null) {
try {
- Thread.sleep(50);
+ Thread.sleep(500);
} catch (InterruptedException e) {
throw new MemoryException(e);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java
new file mode 100644
index 0000000..d975cd4
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.memory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+/**
+ * Memory manager to keep track of
+ * all memory for storing the sorted data
+ */
+public class UnsafeSortMemoryManager {
+
+ /**
+ * logger
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(UnsafeSortMemoryManager.class.getName());
+
+ /**
+ * offheap is enabled
+ */
+ private static boolean offHeap = Boolean.parseBoolean(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+ CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT));
+
+ /**
+ * map to keep taskid to memory blocks
+ */
+ private static Map<Long, Set<MemoryBlock>> taskIdToMemoryBlockMap;
+
+ /**
+ * singleton instance
+ */
+ public static final UnsafeSortMemoryManager INSTANCE;
+
+ /**
+ * total memory available for sort data storage
+ */
+ private long totalMemory;
+
+ /**
+ * current memory used
+ */
+ private long memoryUsed;
+
+ /**
+ * current memory allocator
+ */
+ private MemoryAllocator allocator;
+
+ static {
+ long size;
+ try {
+ size = Long.parseLong(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB,
+ CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT));
+ } catch (Exception e) {
+ size = Long.parseLong(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT);
+ LOGGER.info("Wrong memory size given, " + "so setting default value to " + size);
+ }
+ if (size < 1024) {
+ size = 1024;
+ LOGGER.info("It is not recommended to keep unsafe memory size less than 1024MB, "
+ + "so setting default value to " + size);
+ }
+
+ long takenSize = size * 1024 * 1024;
+ MemoryAllocator allocator;
+ if (offHeap) {
+ allocator = MemoryAllocator.UNSAFE;
+ } else {
+ long maxMemory = Runtime.getRuntime().maxMemory() * 60 / 100;
+ if (takenSize > maxMemory) {
+ takenSize = maxMemory;
+ }
+ allocator = MemoryAllocator.HEAP;
+ }
+ INSTANCE = new UnsafeSortMemoryManager(takenSize, allocator);
+ taskIdToMemoryBlockMap = new HashMap<>();
+ }
+
+ private UnsafeSortMemoryManager(long totalMemory, MemoryAllocator allocator) {
+ this.totalMemory = totalMemory;
+ this.allocator = allocator;
+ LOGGER.info("Sort Memory manager is created with size " + totalMemory + " with " + allocator);
+ }
+
+ /**
+ * Below method will be used to check whether memory required is
+ * available or not
+ *
+ * @param required
+ * @return if memory available
+ */
+ public synchronized boolean isMemoryAvailable(long required) {
+ return memoryUsed + required < totalMemory;
+ }
+
+ /**
+ * Below method will be used to allocate dummy memory
+ * this will be used to allocate first and then used when u need
+ *
+ * @param size
+ */
+ public synchronized void allocateDummyMemory(long size) {
+ memoryUsed += size;
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Working Memory block (" + size + ") is created with size " + size
+ + ". Total memory used " + memoryUsed + "Bytes, left " + (totalMemory - memoryUsed)
+ + "Bytes");
+ }
+ }
+
+ public synchronized void freeMemory(long taskId, MemoryBlock memoryBlock) {
+ allocator.free(memoryBlock);
+ taskIdToMemoryBlockMap.get(taskId).remove(memoryBlock);
+ memoryUsed -= memoryBlock.size();
+ memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "Freeing memory of size: " + memoryBlock.size() + ": Current available memory is: " + (
+ totalMemory - memoryUsed));
+ }
+ }
+
+ /**
+ * Below method will be used to free all the
+ * memory occupied for a task, this will be useful
+ * when in case of task failure we need to clear all the memory occupied
+ * @param taskId
+ */
+ public void freeMemoryAll(long taskId) {
+ Set<MemoryBlock> memoryBlockSet = null;
+ synchronized (INSTANCE) {
+ memoryBlockSet = taskIdToMemoryBlockMap.remove(taskId);
+ }
+ long occuppiedMemory = 0;
+ if (null != memoryBlockSet) {
+ Iterator<MemoryBlock> iterator = memoryBlockSet.iterator();
+ MemoryBlock memoryBlock = null;
+ while (iterator.hasNext()) {
+ memoryBlock = iterator.next();
+ occuppiedMemory += memoryBlock.size();
+ allocator.free(memoryBlock);
+ }
+ }
+ synchronized (INSTANCE) {
+ memoryUsed -= occuppiedMemory;
+ memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
+ }
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "Freeing memory of size: " + occuppiedMemory + ": Current available memory is: " + (
+ totalMemory - memoryUsed));
+ }
+ }
+
+ /**
+ * Before calling this method caller should call allocateMemoryDummy
+ * This method will be used to allocate the memory, this can be used
+ * when caller wants to allocate memory first and used it anytime
+ * @param taskId
+ * @param memoryRequested
+ * @return memory block
+ */
+ public synchronized MemoryBlock allocateMemoryLazy(long taskId, long memoryRequested) {
+ MemoryBlock allocate = allocator.allocate(memoryRequested);
+ Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId);
+ if (null == listOfMemoryBlock) {
+ listOfMemoryBlock = new HashSet<>();
+ taskIdToMemoryBlockMap.put(taskId, listOfMemoryBlock);
+ }
+ listOfMemoryBlock.add(allocate);
+ return allocate;
+ }
+
+ /**
+ * It tries to allocate memory of `size` bytes, keep retry until it allocates successfully.
+ */
+ public static MemoryBlock allocateMemoryWithRetry(long taskId, long size) throws MemoryException {
+ MemoryBlock baseBlock = null;
+ int tries = 0;
+ while (tries < 100) {
+ baseBlock = INSTANCE.allocateMemory(taskId, size);
+ if (baseBlock == null) {
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ throw new MemoryException(e);
+ }
+ } else {
+ break;
+ }
+ tries++;
+ }
+ if (baseBlock == null) {
+ throw new MemoryException("Not enough memory");
+ }
+ return baseBlock;
+ }
+
+ private synchronized MemoryBlock allocateMemory(long taskId, long memoryRequested) {
+ if (memoryUsed + memoryRequested <= totalMemory) {
+ MemoryBlock allocate = allocator.allocate(memoryRequested);
+ memoryUsed += allocate.size();
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "Working Memory block (" + allocate.size() + ") is created with size " + allocate.size()
+ + ". Total memory used " + memoryUsed + "Bytes, left " + (totalMemory - memoryUsed)
+ + "Bytes");
+ }
+ Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId);
+ if (null == listOfMemoryBlock) {
+ listOfMemoryBlock = new HashSet<>();
+ taskIdToMemoryBlockMap.put(taskId, listOfMemoryBlock);
+ }
+ listOfMemoryBlock.add(allocate);
+ return allocate;
+ }
+ return null;
+ }
+
+ public static boolean isOffHeap() {
+ return offHeap;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index ff54673..faa4564 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -43,6 +43,7 @@ import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.datastore.block.TableBlockUniqueIdentifier;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
+import org.apache.carbondata.core.memory.UnsafeMemoryManager;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.encoder.Encoding;
@@ -62,6 +63,7 @@ import org.apache.carbondata.core.stats.QueryStatisticsConstants;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.commons.lang3.ArrayUtils;
@@ -519,6 +521,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
if (null != queryIterator) {
queryIterator.close();
}
+ UnsafeMemoryManager.INSTANCE.freeMemoryAll(ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId());
if (null != queryProperties.executorService) {
queryProperties.executorService.shutdown();
try {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/core/src/main/java/org/apache/carbondata/core/util/CarbonTaskInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonTaskInfo.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonTaskInfo.java
new file mode 100644
index 0000000..d3e4d7a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonTaskInfo.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.util;
+
+import java.io.Serializable;
+
+/**
+ * Value object to keep track of all the thread local variable
+ */
+public class CarbonTaskInfo implements Serializable {
+
+ /**
+ * serial version id
+ */
+ private static final long serialVersionUID = 1L;
+
+ public long taskId;
+
+ public long getTaskId() {
+ return taskId;
+ }
+
+ public void setTaskId(long taskId) {
+ this.taskId = taskId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalTaskInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalTaskInfo.java b/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalTaskInfo.java
new file mode 100644
index 0000000..8c871b8
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalTaskInfo.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.util;
+
+/**
+ * Class to keep all the thread local variable for task
+ */
+public class ThreadLocalTaskInfo {
+ static final InheritableThreadLocal<CarbonTaskInfo> threadLocal =
+ new InheritableThreadLocal<CarbonTaskInfo>();
+
+ public static void setCarbonTaskInfo(CarbonTaskInfo carbonTaskInfo) {
+ threadLocal.set(carbonTaskInfo);
+ }
+
+ public static CarbonTaskInfo getCarbonTaskInfo() {
+ return threadLocal.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
index 106a9fd..48e97ed 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
@@ -22,7 +22,7 @@ import scala.reflect.ClassTag
import org.apache.spark.{Dependency, OneToOneDependency, Partition, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
-import org.apache.carbondata.core.util.{CarbonSessionInfo, SessionParams, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.{CarbonSessionInfo, CarbonTaskInfo, SessionParams, ThreadLocalSessionInfo, ThreadLocalTaskInfo}
/**
* This RDD maintains session level ThreadLocal
@@ -41,6 +41,9 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
final def compute(split: Partition, context: TaskContext): Iterator[T] = {
ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
+ val carbonTaskInfo = new CarbonTaskInfo
+ carbonTaskInfo.setTaskId(System.nanoTime)
+ ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo)
internalCompute(split, context)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index d325f71..ac1c723 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -42,7 +42,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.common.logging.impl.StandardLogService
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, ThreadLocalTaskInfo}
import org.apache.carbondata.processing.csvload.BlockDetails
import org.apache.carbondata.processing.csvload.CSVInputFormat
import org.apache.carbondata.processing.csvload.CSVRecordReaderIterator
@@ -240,7 +240,11 @@ class NewCarbonDataLoadRDD[K, V](
loadMetadataDetails)
// Intialize to set carbon properties
loader.initialize()
- new DataLoadExecutor().execute(model,
+ val executor = new DataLoadExecutor()
+ // in case of success, failure or cancelation clear memory and stop execution
+ context.addTaskCompletionListener { context => executor.close()
+ CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)}
+ executor.execute(model,
loader.storeLocation,
recordReaders)
} catch {
@@ -327,7 +331,6 @@ class NewCarbonDataLoadRDD[K, V](
}
}
}
-
/**
* generate blocks id
*
@@ -423,7 +426,6 @@ class NewDataFrameLoaderRDD[K, V](
recordReaders += new LazyRddIterator(serializer, serializeBytes, value.partition,
carbonLoadModel, context)
}
-
val loader = new SparkPartitionLoader(model,
theSplit.index,
null,
@@ -431,7 +433,11 @@ class NewDataFrameLoaderRDD[K, V](
loadMetadataDetails)
// Intialize to set carbon properties
loader.initialize()
- new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders.toArray)
+ val executor = new DataLoadExecutor
+ // in case of success, failure or cancelation clear memory and stop execution
+ context.addTaskCompletionListener { context => executor.close()
+ CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)}
+ executor.execute(model, loader.storeLocation, recordReaders.toArray)
} catch {
case e: BadRecordFoundException =>
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
@@ -606,7 +612,6 @@ class PartitionTableDataLoaderRDD[K, V](
carbonLoadModel.setSegmentId(String.valueOf(loadCount))
carbonLoadModel.setTaskNo(String.valueOf(partitionInfo.getPartitionId(theSplit.index)))
carbonLoadModel.setPreFetch(false)
-
val recordReaders = Array[CarbonIterator[Array[AnyRef]]] {
new NewRddIterator(firstParent[Row].iterator(theSplit, context), carbonLoadModel, context)
}
@@ -618,7 +623,11 @@ class PartitionTableDataLoaderRDD[K, V](
loadMetadataDetails)
// Intialize to set carbon properties
loader.initialize()
- new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders)
+ val executor = new DataLoadExecutor
+ // in case of success, failure or cancelation clear memory and stop execution
+ context.addTaskCompletionListener { context => executor.close()
+ CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)}
+ executor.execute(model, loader.storeLocation, recordReaders)
} catch {
case e: BadRecordFoundException =>
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
@@ -642,7 +651,6 @@ class PartitionTableDataLoaderRDD[K, V](
}
}
var finished = false
-
override def hasNext: Boolean = !finished
override def next(): (K, V) = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index bb8c5a6..579347b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -17,6 +17,7 @@
package org.apache.carbondata.spark.util
+
import java.text.SimpleDateFormat
import java.util
@@ -35,6 +36,7 @@ import org.apache.spark.util.FileUtils
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.memory.{UnsafeMemoryManager, UnsafeSortMemoryManager}
import org.apache.carbondata.core.metadata.datatype.DataType
import org.apache.carbondata.core.metadata.schema.PartitionInfo
import org.apache.carbondata.core.metadata.schema.partition.PartitionType
@@ -615,8 +617,21 @@ object CommonUtil {
result.result()
}
- def partitionInfoOutput: Seq[Attribute] = Seq(
- AttributeReference("partition", StringType, nullable = false,
- new MetadataBuilder().putString("comment", "partitions info").build())()
- )
+ def partitionInfoOutput: Seq[Attribute] = {
+ Seq(
+ AttributeReference("partition", StringType, nullable = false,
+ new MetadataBuilder().putString("comment", "partitions info").build())()
+ )
+ }
+
+ /**
+ * Method to clear the memory for a task
+ * if present
+ */
+ def clearUnsafeMemory(taskId: Long) {
+ UnsafeMemoryManager.
+ INSTANCE.freeMemoryAll(taskId)
+ UnsafeSortMemoryManager.
+ INSTANCE.freeMemoryAll(taskId)
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
index e252e7f..3a6428d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
@@ -304,6 +304,9 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri
if (boundedInputStream != null) {
boundedInputStream.close();
}
+ if (null != csvParser) {
+ csvParser.stopParsing();
+ }
} finally {
reader = null;
boundedInputStream = null;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
index 66e6d37..d4e79f8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
@@ -33,9 +33,12 @@ public class DataLoadExecutor {
private static final LogService LOGGER =
LogServiceFactory.getLogService(DataLoadExecutor.class.getName());
+ private AbstractDataLoadProcessorStep loadProcessorStep;
+
+ private boolean isClosed;
+
public void execute(CarbonLoadModel loadModel, String storeLocation,
CarbonIterator<Object[]>[] inputIterators) throws Exception {
- AbstractDataLoadProcessorStep loadProcessorStep = null;
try {
loadProcessorStep =
new DataLoadProcessBuilder().build(loadModel, storeLocation, inputIterators);
@@ -60,10 +63,6 @@ public class DataLoadExecutor {
} finally {
removeBadRecordKey(
loadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier());
- if (loadProcessorStep != null) {
- // 3. Close the step
- loadProcessorStep.close();
- }
}
}
@@ -91,4 +90,14 @@ public class DataLoadExecutor {
String badRecordLoggerKey = carbonTableIdentifier.getBadRecordLoggerKey();
BadRecordsLogger.removeBadRecordKey(badRecordLoggerKey);
}
+
+ /**
+ * Method to clean all the resource
+ */
+ public void close() {
+ if (!isClosed && loadProcessorStep != null) {
+ loadProcessorStep.close();
+ }
+ isClosed = true;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
index 2ac138b..9d2ee9a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
@@ -25,6 +25,8 @@ import java.util.Arrays;
import org.apache.carbondata.core.memory.CarbonUnsafe;
import org.apache.carbondata.core.memory.IntPointerBuffer;
import org.apache.carbondata.core.memory.MemoryBlock;
+import org.apache.carbondata.core.memory.UnsafeMemoryManager;
+import org.apache.carbondata.core.memory.UnsafeSortMemoryManager;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.util.DataTypeUtil;
@@ -55,9 +57,13 @@ public class UnsafeCarbonRowPage {
private boolean saveToDisk;
+ private MemoryManagerType managerType;
+
+ private long taskId;
+
public UnsafeCarbonRowPage(boolean[] noDictionaryDimensionMapping,
boolean[] noDictionarySortColumnMapping, int dimensionSize, int measureSize, DataType[] type,
- MemoryBlock memoryBlock, boolean saveToDisk) {
+ MemoryBlock memoryBlock, boolean saveToDisk, long taskId) {
this.noDictionaryDimensionMapping = noDictionaryDimensionMapping;
this.noDictionarySortColumnMapping = noDictionarySortColumnMapping;
this.dimensionSize = dimensionSize;
@@ -65,10 +71,12 @@ public class UnsafeCarbonRowPage {
this.measureDataType = type;
this.saveToDisk = saveToDisk;
this.nullSetWords = new long[((measureSize - 1) >> 6) + 1];
- buffer = new IntPointerBuffer(memoryBlock);
- this.dataBlock = buffer.getBaseBlock();
+ this.taskId = taskId;
+ buffer = new IntPointerBuffer(this.taskId);
+ this.dataBlock = memoryBlock;
// TODO Only using 98% of space for safe side.May be we can have different logic.
sizeToBeUsed = dataBlock.size() - (dataBlock.size() * 5) / 100;
+ this.managerType = MemoryManagerType.UNSAFE_MEMORY_MANAGER;
}
public int addRow(Object[] row) {
@@ -324,7 +332,14 @@ public class UnsafeCarbonRowPage {
}
public void freeMemory() {
- buffer.freeMemory();
+ switch (managerType) {
+ case UNSAFE_MEMORY_MANAGER:
+ UnsafeMemoryManager.INSTANCE.freeMemory(taskId, dataBlock);
+ break;
+ default:
+ UnsafeSortMemoryManager.INSTANCE.freeMemory(taskId, dataBlock);
+ buffer.freeMemory();
+ }
}
public boolean isSaveToDisk() {
@@ -369,4 +384,13 @@ public class UnsafeCarbonRowPage {
public boolean[] getNoDictionarySortColumnMapping() {
return noDictionarySortColumnMapping;
}
+
+ public void setNewDataBlock(MemoryBlock newMemoryBlock) {
+ this.dataBlock = newMemoryBlock;
+ this.managerType = MemoryManagerType.UNSAFE_SORT_MEMORY_MANAGER;
+ }
+
+ public enum MemoryManagerType {
+ UNSAFE_MEMORY_MANAGER, UNSAFE_SORT_MEMORY_MANAGER
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/df22368d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
index a42d0ea..8021b45 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
@@ -31,12 +31,15 @@ import java.util.concurrent.TimeUnit;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.memory.CarbonUnsafe;
import org.apache.carbondata.core.memory.IntPointerBuffer;
import org.apache.carbondata.core.memory.MemoryBlock;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.memory.UnsafeMemoryManager;
+import org.apache.carbondata.core.memory.UnsafeSortMemoryManager;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
import org.apache.carbondata.processing.newflow.sort.unsafe.comparator.UnsafeRowComparator;
import org.apache.carbondata.processing.newflow.sort.unsafe.comparator.UnsafeRowComparatorForNormalDIms;
import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeCarbonRow;
@@ -86,6 +89,8 @@ public class UnsafeSortDataRows {
*/
private Semaphore semaphore;
+ private final long taskId;
+
public UnsafeSortDataRows(SortParameters parameters,
UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger, int inMemoryChunkSize) {
this.parameters = parameters;
@@ -94,9 +99,9 @@ public class UnsafeSortDataRows {
// observer of writing file in thread
this.threadStatusObserver = new ThreadStatusObserver();
-
+ this.taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
this.inMemoryChunkSize = inMemoryChunkSize;
- this.inMemoryChunkSize = this.inMemoryChunkSize * 1024 * 1024;
+ this.inMemoryChunkSize = inMemoryChunkSize * 1024 * 1024;
enableInMemoryIntermediateMerge = Boolean.parseBoolean(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT,
CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT_DEFAULT));
@@ -115,12 +120,18 @@ public class UnsafeSortDataRows {
* This method will be used to initialize
*/
public void initialize() throws MemoryException {
- MemoryBlock baseBlock = UnsafeMemoryManager.allocateMemoryWithRetry(inMemoryChunkSize);
+ MemoryBlock baseBlock =
+ UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize);
+ boolean isMemoryAvailable =
+ UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(baseBlock.size());
+ if (isMemoryAvailable) {
+ UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(baseBlock.size());
+ }
this.rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(),
parameters.getNoDictionarySortColumn(),
parameters.getDimColCount() + parameters.getComplexDimColCount(),
parameters.getMeasureColCount(), parameters.getMeasureDataType(), baseBlock,
- !UnsafeMemoryManager.INSTANCE.isMemoryAvailable());
+ !isMemoryAvailable, taskId);
// Delete if any older file exists in sort temp folder
deleteSortLocationIfExists();
@@ -177,8 +188,13 @@ public class UnsafeSortDataRows {
unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
semaphore.acquire();
dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage));
- MemoryBlock memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(inMemoryChunkSize);
- boolean saveToDisk = !UnsafeMemoryManager.INSTANCE.isMemoryAvailable();
+ MemoryBlock memoryBlock =
+ UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize);
+ boolean saveToDisk =
+ UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(memoryBlock.size());
+ if (!saveToDisk) {
+ UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(memoryBlock.size());
+ }
rowPage = new UnsafeCarbonRowPage(
parameters.getNoDictionaryDimnesionColumn(),
parameters.getNoDictionarySortColumn(),
@@ -186,7 +202,7 @@ public class UnsafeSortDataRows {
parameters.getMeasureColCount(),
parameters.getMeasureDataType(),
memoryBlock,
- saveToDisk);
+ saveToDisk, taskId);
bytesAdded += rowPage.addRow(rowBatch[i]);
} catch (Exception e) {
LOGGER.error(
@@ -214,14 +230,18 @@ public class UnsafeSortDataRows {
unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
semaphore.acquire();
dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage));
- MemoryBlock memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(inMemoryChunkSize);
- boolean saveToDisk = !UnsafeMemoryManager.INSTANCE.isMemoryAvailable();
+ MemoryBlock memoryBlock =
+ UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize);
+ boolean saveToDisk = UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(memoryBlock.size());
+ if (!saveToDisk) {
+ UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(memoryBlock.size());
+ }
rowPage = new UnsafeCarbonRowPage(
parameters.getNoDictionaryDimnesionColumn(),
parameters.getNoDictionarySortColumn(),
parameters.getDimColCount(), parameters.getMeasureColCount(),
parameters.getMeasureDataType(), memoryBlock,
- saveToDisk);
+ saveToDisk, taskId);
rowPage.addRow(row);
} catch (Exception e) {
LOGGER.error(
@@ -343,7 +363,7 @@ public class UnsafeSortDataRows {
timSort.sort(page.getBuffer(), 0, page.getBuffer().getActualSize(),
new UnsafeRowComparatorForNormalDIms(page));
}
- if (rowPage.isSaveToDisk()) {
+ if (page.isSaveToDisk()) {
// create a new file every time
File sortTempFile = new File(
parameters.getTempFileLocation() + File.separator + parameters.getTableName() + System
@@ -356,6 +376,18 @@ public class UnsafeSortDataRows {
// intermediate merging of sort temp files will be triggered
unsafeInMemoryIntermediateFileMerger.addFileToMerge(sortTempFile);
} else {
+ // creating a new memory block as size is already allocated
+ // so calling lazy memory allocator
+ MemoryBlock newMemoryBlock = UnsafeSortMemoryManager.INSTANCE
+ .allocateMemoryLazy(taskId, page.getDataBlock().size());
+ // copying data from working memory manager to sortmemory manager
+ CarbonUnsafe.unsafe
+ .copyMemory(page.getDataBlock().getBaseObject(), page.getDataBlock().getBaseOffset(),
+ newMemoryBlock.getBaseObject(), newMemoryBlock.getBaseOffset(),
+ page.getDataBlock().size());
+ // free unsafememory manager
+ page.freeMemory();
+ page.setNewDataBlock(newMemoryBlock);
// add sort temp filename to and arrayList. When the list size reaches 20 then
// intermediate merging of sort temp files will be triggered
page.getBuffer().loadToUnsafe();