You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by kumarvishal09 <gi...@git.apache.org> on 2017/07/19 08:44:39 UTC

[GitHub] carbondata pull request #1185: [WIP]Fixed Concurrent table data loading unsa...

GitHub user kumarvishal09 opened a pull request:

    https://github.com/apache/carbondata/pull/1185

    [WIP]Fixed Concurrent table data loading unsafe memory issue

    Fixed task cancellation leak issue
    Fixed task cleanup issue in data loading
    Fixed Concurrent table data loading unsafe memory issue
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kumarvishal09/incubator-carbondata ConcurrentTableLoadingIssue

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/carbondata/pull/1185.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1185
    
----
commit 750c3f542e70bd65ffcd322874d82e14b98f11d4
Author: kumarvishal <ku...@gmail.com>
Date:   2017-07-19T08:12:52Z

    Fixed Concurrent table data loading unsafe memory issue
    Fixed task cancellation leak issue
    Fixed task cleanup issue in data loading

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] carbondata issue #1185: [WIP]Fixed Concurrent table data loading unsafe memo...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1185
  
    Build Success with Spark 1.6, Please check CI http://144.76.159.231:8080/job/ApacheCarbonPRBuilder/526/



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] carbondata pull request #1185: [WIP]Fixed Concurrent table data loading unsa...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1185#discussion_r128227396
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java ---
    @@ -75,76 +79,67 @@
     
       private MemoryAllocator allocator;
     
    -  private long minimumMemory;
    -
    -  // for debug purpose
    -  private Set<MemoryBlock> set = new HashSet<>();
    -
       private UnsafeMemoryManager(long totalMemory, MemoryAllocator allocator) {
         this.totalMemory = totalMemory;
         this.allocator = allocator;
    -    long numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
    -    long sortMemoryChunkSize = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
    -    sortMemoryChunkSize = sortMemoryChunkSize * 1024 * 1024;
    -    long totalWorkingMemoryForAllThreads = sortMemoryChunkSize * numberOfCores;
    -    if (totalWorkingMemoryForAllThreads >= totalMemory) {
    -      throw new RuntimeException("Working memory should be less than total memory configured, "
    -          + "so either reduce the loading threads or increase the memory size. "
    -          + "(Number of threads * number of threads) should be less than total unsafe memory");
    -    }
    -    minimumMemory = totalWorkingMemoryForAllThreads;
    -    LOGGER.info("Memory manager is created with size " + totalMemory + " with " + allocator
    -        + " and minimum reserve memory " + minimumMemory);
    +    LOGGER
    +        .info("Working Memory manager is created with size " + totalMemory + " with " + allocator);
       }
     
    -  private synchronized MemoryBlock allocateMemory(long memoryRequested) {
    +  private synchronized MemoryBlock allocateMemory(long taskId, long memoryRequested) {
         if (memoryUsed + memoryRequested <= totalMemory) {
           MemoryBlock allocate = allocator.allocate(memoryRequested);
           memoryUsed += allocate.size();
    -      if (LOGGER.isDebugEnabled()) {
    -        set.add(allocate);
    -        LOGGER.error("Memory block (" + allocate + ") is created with size "  + allocate.size() +
    -            ". Total memory used " + memoryUsed + "Bytes, left " + getAvailableMemory() + "Bytes");
    +      Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId);
    +      if (null == listOfMemoryBlock) {
    +        listOfMemoryBlock = new HashSet<>();
    +        taskIdToMemoryBlockMap.put(taskId, listOfMemoryBlock);
           }
    +      listOfMemoryBlock.add(allocate);
           return allocate;
         }
         return null;
       }
     
    -  public synchronized void freeMemory(MemoryBlock memoryBlock) {
    +  public synchronized void freeMemory(long taskId,MemoryBlock memoryBlock) {
    +    taskIdToMemoryBlockMap.get(taskId).remove(memoryBlock);
    --- End diff --
    
    Please add log


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] carbondata pull request #1185: [CARBONDATA-1318]Fixed Concurrent table data ...

Posted by kumarvishal09 <gi...@git.apache.org>.
Github user kumarvishal09 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1185#discussion_r128264917
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java ---
    @@ -0,0 +1,216 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.core.memory;
    +
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +
    +/**
    + * Memory manager to keep track of
    + * all memory for storing the sorted data
    + */
    +public class UnsafeSortMemoryManager {
    +
    +  /**
    +   * logger
    +   */
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(UnsafeSortMemoryManager.class.getName());
    +
    +  /**
    +   * offheap is enabled
    +   */
    +  private static boolean offHeap = Boolean.parseBoolean(CarbonProperties.getInstance()
    +      .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
    +          CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT));
    +
    +  /**
    +   * map to keep taskid to memory blocks
    +   */
    +  private static Map<Long, Set<MemoryBlock>> taskIdToMemoryBlockMap;
    +
    +  /**
    +   * singleton instance
    +   */
    +  public static final UnsafeSortMemoryManager INSTANCE;
    +
    +  /**
    +   * total memory available for sort data storage
    +   */
    +  private long totalMemory;
    +
    +  /**
    +   * current memory used
    +   */
    +  private long memoryUsed;
    +
    +  /**
    +   * current memory allocator
    +   */
    +  private MemoryAllocator allocator;
    +
    +  static {
    +    long size;
    +    try {
    +      size = Long.parseLong(CarbonProperties.getInstance()
    +          .getProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB,
    +              CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT));
    +    } catch (Exception e) {
    +      size = Long.parseLong(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT);
    +      LOGGER.info("Wrong memory size given, " + "so setting default value to " + size);
    +    }
    +    if (size < 1024) {
    +      size = 1024;
    +      LOGGER.info("It is not recommended to keep unsafe memory size less than 1024MB, "
    +          + "so setting default value to " + size);
    +    }
    +
    +    long takenSize = size * 1024 * 1024;
    +    MemoryAllocator allocator;
    +    if (offHeap) {
    +      allocator = MemoryAllocator.UNSAFE;
    +    } else {
    +      long maxMemory = Runtime.getRuntime().maxMemory() * 60 / 100;
    +      if (takenSize > maxMemory) {
    +        takenSize = maxMemory;
    +      }
    +      allocator = MemoryAllocator.HEAP;
    +    }
    +    INSTANCE = new UnsafeSortMemoryManager(takenSize, allocator);
    +    taskIdToMemoryBlockMap = new HashMap<>();
    +  }
    +
    +  private UnsafeSortMemoryManager(long totalMemory, MemoryAllocator allocator) {
    +    this.totalMemory = totalMemory;
    +    this.allocator = allocator;
    +    LOGGER.info("Sort Memory manager is created with size " + totalMemory + " with " + allocator);
    +  }
    +
    +  /**
    +   * Below method will be used to check whether memory required is
    +   * available or not
    +   *
    +   * @param required
    +   * @return if memory available
    +   */
    +  public synchronized boolean isMemoryAvailable(long required) {
    +    return memoryUsed + required < totalMemory;
    +  }
    +
    +  /**
    +   * Below method will be used to allocate dummy memory
    +   * this will be used to allocate first and then used when u need
    +   *
    +   * @param size
    +   */
    +  public synchronized void allocateDummyMemory(long size) {
    +    memoryUsed += size;
    +  }
    +
    +  /**
    +   * Get total memory available
    +   *
    +   * @return amount of memory available
    +   */
    +  public long getUsableMemory() {
    +    return totalMemory;
    +  }
    +
    +  public synchronized void freeMemory(long taskId, MemoryBlock memoryBlock) {
    +    allocator.free(memoryBlock);
    +    taskIdToMemoryBlockMap.get(taskId).remove(memoryBlock);
    +    memoryUsed -= memoryBlock.size();
    +    memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
    +  }
    +
    +  public void freeMemoryAll(long taskId) {
    +    Set<MemoryBlock> memoryBlocks = null;
    +    synchronized (INSTANCE) {
    +      memoryBlocks = taskIdToMemoryBlockMap.remove(taskId);
    +    }
    +    if (null != memoryBlocks) {
    +      Iterator<MemoryBlock> iterator = memoryBlocks.iterator();
    +      while (iterator.hasNext()) {
    +        allocator.free(iterator.next());
    +      }
    +    }
    +  }
    +
    +  public synchronized MemoryBlock allocateMemoryLazy(long taskId, long memoryRequested) {
    +    MemoryBlock allocate = allocator.allocate(memoryRequested);
    +    memoryUsed += allocate.size();
    +    Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId);
    +    if (null == listOfMemoryBlock) {
    +      listOfMemoryBlock = new HashSet<>();
    +      taskIdToMemoryBlockMap.put(taskId, listOfMemoryBlock);
    +    }
    +    listOfMemoryBlock.add(allocate);
    +    return allocate;
    +  }
    +
    +  /**
    +   * It tries to allocate memory of `size` bytes, keep retry until it allocates successfully.
    +   */
    +  public static MemoryBlock allocateMemoryWithRetry(long taskId, long size) throws MemoryException {
    +    MemoryBlock baseBlock = null;
    +    int tries = 0;
    +    while (tries < 100) {
    +      baseBlock = INSTANCE.allocateMemory(taskId, size);
    +      if (baseBlock == null) {
    +        try {
    +          Thread.sleep(50);
    +        } catch (InterruptedException e) {
    +          throw new MemoryException(e);
    +        }
    +      } else {
    +        break;
    +      }
    +      tries++;
    +    }
    +    if (baseBlock == null) {
    +      throw new MemoryException("Not enough memory");
    +    }
    +    return baseBlock;
    +  }
    +
    +  private synchronized MemoryBlock allocateMemory(long taskId, long memoryRequested) {
    +    if (memoryUsed + memoryRequested <= totalMemory) {
    --- End diff --
    
    ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] carbondata pull request #1185: [WIP]Fixed Concurrent table data loading unsa...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1185#discussion_r128228002
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java ---
    @@ -0,0 +1,216 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.core.memory;
    +
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +
    +/**
    + * Memory manager to keep track of
    + * all memory for storing the sorted data
    + */
    +public class UnsafeSortMemoryManager {
    +
    +  /**
    +   * logger
    +   */
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(UnsafeSortMemoryManager.class.getName());
    +
    +  /**
    +   * offheap is enabled
    +   */
    +  private static boolean offHeap = Boolean.parseBoolean(CarbonProperties.getInstance()
    +      .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
    +          CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT));
    +
    +  /**
    +   * map to keep taskid to memory blocks
    +   */
    +  private static Map<Long, Set<MemoryBlock>> taskIdToMemoryBlockMap;
    +
    +  /**
    +   * singleton instance
    +   */
    +  public static final UnsafeSortMemoryManager INSTANCE;
    +
    +  /**
    +   * total memory available for sort data storage
    +   */
    +  private long totalMemory;
    +
    +  /**
    +   * current memory used
    +   */
    +  private long memoryUsed;
    +
    +  /**
    +   * current memory allocator
    +   */
    +  private MemoryAllocator allocator;
    +
    +  static {
    +    long size;
    +    try {
    +      size = Long.parseLong(CarbonProperties.getInstance()
    +          .getProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB,
    +              CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT));
    +    } catch (Exception e) {
    +      size = Long.parseLong(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT);
    +      LOGGER.info("Wrong memory size given, " + "so setting default value to " + size);
    +    }
    +    if (size < 1024) {
    +      size = 1024;
    +      LOGGER.info("It is not recommended to keep unsafe memory size less than 1024MB, "
    +          + "so setting default value to " + size);
    +    }
    +
    +    long takenSize = size * 1024 * 1024;
    +    MemoryAllocator allocator;
    +    if (offHeap) {
    +      allocator = MemoryAllocator.UNSAFE;
    +    } else {
    +      long maxMemory = Runtime.getRuntime().maxMemory() * 60 / 100;
    +      if (takenSize > maxMemory) {
    +        takenSize = maxMemory;
    +      }
    +      allocator = MemoryAllocator.HEAP;
    +    }
    +    INSTANCE = new UnsafeSortMemoryManager(takenSize, allocator);
    +    taskIdToMemoryBlockMap = new HashMap<>();
    +  }
    +
    +  private UnsafeSortMemoryManager(long totalMemory, MemoryAllocator allocator) {
    +    this.totalMemory = totalMemory;
    +    this.allocator = allocator;
    +    LOGGER.info("Sort Memory manager is created with size " + totalMemory + " with " + allocator);
    +  }
    +
    +  /**
    +   * Below method will be used to check whether memory required is
    +   * available or not
    +   *
    +   * @param required
    +   * @return if memory available
    +   */
    +  public synchronized boolean isMemoryAvailable(long required) {
    +    return memoryUsed + required < totalMemory;
    +  }
    +
    +  /**
    +   * Below method will be used to allocate dummy memory
    +   * this will be used to allocate first and then used when u need
    +   *
    +   * @param size
    +   */
    +  public synchronized void allocateDummyMemory(long size) {
    +    memoryUsed += size;
    +  }
    +
    +  /**
    +   * Get total memory available
    +   *
    +   * @return amount of memory available
    +   */
    +  public long getUsableMemory() {
    +    return totalMemory;
    +  }
    +
    +  public synchronized void freeMemory(long taskId, MemoryBlock memoryBlock) {
    --- End diff --
    
    Please add log


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] carbondata issue #1185: [CARBONDATA-1318]Fixed Concurrent table data loading...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1185
  
    LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] carbondata pull request #1185: [WIP]Fixed Concurrent table data loading unsa...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1185#discussion_r128233461
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java ---
    @@ -0,0 +1,216 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.core.memory;
    +
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +
    +/**
    + * Memory manager to keep track of
    + * all memory for storing the sorted data
    + */
    +public class UnsafeSortMemoryManager {
    +
    +  /**
    +   * logger
    +   */
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(UnsafeSortMemoryManager.class.getName());
    +
    +  /**
    +   * offheap is enabled
    +   */
    +  private static boolean offHeap = Boolean.parseBoolean(CarbonProperties.getInstance()
    +      .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
    +          CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT));
    +
    +  /**
    +   * map to keep taskid to memory blocks
    +   */
    +  private static Map<Long, Set<MemoryBlock>> taskIdToMemoryBlockMap;
    +
    +  /**
    +   * singleton instance
    +   */
    +  public static final UnsafeSortMemoryManager INSTANCE;
    +
    +  /**
    +   * total memory available for sort data storage
    +   */
    +  private long totalMemory;
    +
    +  /**
    +   * current memory used
    +   */
    +  private long memoryUsed;
    +
    +  /**
    +   * current memory allocator
    +   */
    +  private MemoryAllocator allocator;
    +
    +  static {
    +    long size;
    +    try {
    +      size = Long.parseLong(CarbonProperties.getInstance()
    +          .getProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB,
    +              CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT));
    +    } catch (Exception e) {
    +      size = Long.parseLong(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT);
    +      LOGGER.info("Wrong memory size given, " + "so setting default value to " + size);
    +    }
    +    if (size < 1024) {
    +      size = 1024;
    +      LOGGER.info("It is not recommended to keep unsafe memory size less than 1024MB, "
    +          + "so setting default value to " + size);
    +    }
    +
    +    long takenSize = size * 1024 * 1024;
    +    MemoryAllocator allocator;
    +    if (offHeap) {
    +      allocator = MemoryAllocator.UNSAFE;
    +    } else {
    +      long maxMemory = Runtime.getRuntime().maxMemory() * 60 / 100;
    +      if (takenSize > maxMemory) {
    +        takenSize = maxMemory;
    +      }
    +      allocator = MemoryAllocator.HEAP;
    +    }
    +    INSTANCE = new UnsafeSortMemoryManager(takenSize, allocator);
    +    taskIdToMemoryBlockMap = new HashMap<>();
    +  }
    +
    +  private UnsafeSortMemoryManager(long totalMemory, MemoryAllocator allocator) {
    +    this.totalMemory = totalMemory;
    +    this.allocator = allocator;
    +    LOGGER.info("Sort Memory manager is created with size " + totalMemory + " with " + allocator);
    +  }
    +
    +  /**
    +   * Below method will be used to check whether memory required is
    +   * available or not
    +   *
    +   * @param required
    +   * @return if memory available
    +   */
    +  public synchronized boolean isMemoryAvailable(long required) {
    +    return memoryUsed + required < totalMemory;
    +  }
    +
    +  /**
    +   * Below method will be used to allocate dummy memory
    +   * this will be used to allocate first and then used when u need
    +   *
    +   * @param size
    +   */
    +  public synchronized void allocateDummyMemory(long size) {
    +    memoryUsed += size;
    +  }
    +
    +  /**
    +   * Get total memory available
    +   *
    +   * @return amount of memory available
    +   */
    +  public long getUsableMemory() {
    +    return totalMemory;
    +  }
    +
    +  public synchronized void freeMemory(long taskId, MemoryBlock memoryBlock) {
    +    allocator.free(memoryBlock);
    +    taskIdToMemoryBlockMap.get(taskId).remove(memoryBlock);
    +    memoryUsed -= memoryBlock.size();
    +    memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
    +  }
    +
    +  public void freeMemoryAll(long taskId) {
    +    Set<MemoryBlock> memoryBlocks = null;
    +    synchronized (INSTANCE) {
    +      memoryBlocks = taskIdToMemoryBlockMap.remove(taskId);
    +    }
    +    if (null != memoryBlocks) {
    +      Iterator<MemoryBlock> iterator = memoryBlocks.iterator();
    +      while (iterator.hasNext()) {
    +        allocator.free(iterator.next());
    +      }
    +    }
    +  }
    +
    +  public synchronized MemoryBlock allocateMemoryLazy(long taskId, long memoryRequested) {
    +    MemoryBlock allocate = allocator.allocate(memoryRequested);
    +    memoryUsed += allocate.size();
    +    Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId);
    +    if (null == listOfMemoryBlock) {
    +      listOfMemoryBlock = new HashSet<>();
    +      taskIdToMemoryBlockMap.put(taskId, listOfMemoryBlock);
    +    }
    +    listOfMemoryBlock.add(allocate);
    +    return allocate;
    +  }
    +
    +  /**
    +   * It tries to allocate memory of `size` bytes, keep retry until it allocates successfully.
    +   */
    +  public static MemoryBlock allocateMemoryWithRetry(long taskId, long size) throws MemoryException {
    +    MemoryBlock baseBlock = null;
    +    int tries = 0;
    +    while (tries < 100) {
    +      baseBlock = INSTANCE.allocateMemory(taskId, size);
    +      if (baseBlock == null) {
    +        try {
    +          Thread.sleep(50);
    +        } catch (InterruptedException e) {
    +          throw new MemoryException(e);
    +        }
    +      } else {
    +        break;
    +      }
    +      tries++;
    +    }
    +    if (baseBlock == null) {
    +      throw new MemoryException("Not enough memory");
    +    }
    +    return baseBlock;
    +  }
    +
    +  private synchronized MemoryBlock allocateMemory(long taskId, long memoryRequested) {
    +    if (memoryUsed + memoryRequested <= totalMemory) {
    --- End diff --
    
    add log


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] carbondata pull request #1185: [CARBONDATA-1318]Fixed Concurrent table data ...

Posted by kumarvishal09 <gi...@git.apache.org>.
Github user kumarvishal09 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1185#discussion_r128495543
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java ---
    @@ -0,0 +1,216 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.core.memory;
    +
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +
    +/**
    + * Memory manager to keep track of
    + * all memory for storing the sorted data
    + */
    +public class UnsafeSortMemoryManager {
    +
    +  /**
    +   * logger
    +   */
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(UnsafeSortMemoryManager.class.getName());
    +
    +  /**
    +   * offheap is enabled
    +   */
    +  private static boolean offHeap = Boolean.parseBoolean(CarbonProperties.getInstance()
    +      .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
    +          CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT));
    +
    +  /**
    +   * map to keep taskid to memory blocks
    +   */
    +  private static Map<Long, Set<MemoryBlock>> taskIdToMemoryBlockMap;
    +
    +  /**
    +   * singleton instance
    +   */
    +  public static final UnsafeSortMemoryManager INSTANCE;
    +
    +  /**
    +   * total memory available for sort data storage
    +   */
    +  private long totalMemory;
    +
    +  /**
    +   * current memory used
    +   */
    +  private long memoryUsed;
    +
    +  /**
    +   * current memory allocator
    +   */
    +  private MemoryAllocator allocator;
    +
    +  static {
    +    long size;
    +    try {
    +      size = Long.parseLong(CarbonProperties.getInstance()
    +          .getProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB,
    +              CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT));
    +    } catch (Exception e) {
    +      size = Long.parseLong(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT);
    +      LOGGER.info("Wrong memory size given, " + "so setting default value to " + size);
    +    }
    +    if (size < 1024) {
    +      size = 1024;
    +      LOGGER.info("It is not recommended to keep unsafe memory size less than 1024MB, "
    +          + "so setting default value to " + size);
    +    }
    +
    +    long takenSize = size * 1024 * 1024;
    +    MemoryAllocator allocator;
    +    if (offHeap) {
    +      allocator = MemoryAllocator.UNSAFE;
    +    } else {
    +      long maxMemory = Runtime.getRuntime().maxMemory() * 60 / 100;
    +      if (takenSize > maxMemory) {
    +        takenSize = maxMemory;
    +      }
    +      allocator = MemoryAllocator.HEAP;
    +    }
    +    INSTANCE = new UnsafeSortMemoryManager(takenSize, allocator);
    +    taskIdToMemoryBlockMap = new HashMap<>();
    +  }
    +
    +  private UnsafeSortMemoryManager(long totalMemory, MemoryAllocator allocator) {
    +    this.totalMemory = totalMemory;
    +    this.allocator = allocator;
    +    LOGGER.info("Sort Memory manager is created with size " + totalMemory + " with " + allocator);
    +  }
    +
    +  /**
    +   * Below method will be used to check whether memory required is
    +   * available or not
    +   *
    +   * @param required
    +   * @return if memory available
    +   */
    +  public synchronized boolean isMemoryAvailable(long required) {
    +    return memoryUsed + required < totalMemory;
    +  }
    +
    +  /**
    +   * Below method will be used to allocate dummy memory
    +   * this will be used to allocate first and then used when u need
    +   *
    +   * @param size
    +   */
    +  public synchronized void allocateDummyMemory(long size) {
    +    memoryUsed += size;
    +  }
    +
    +  /**
    +   * Get total memory available
    +   *
    +   * @return amount of memory available
    +   */
    +  public long getUsableMemory() {
    +    return totalMemory;
    +  }
    +
    +  public synchronized void freeMemory(long taskId, MemoryBlock memoryBlock) {
    +    allocator.free(memoryBlock);
    +    taskIdToMemoryBlockMap.get(taskId).remove(memoryBlock);
    +    memoryUsed -= memoryBlock.size();
    +    memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
    +  }
    +
    +  public void freeMemoryAll(long taskId) {
    +    Set<MemoryBlock> memoryBlocks = null;
    +    synchronized (INSTANCE) {
    +      memoryBlocks = taskIdToMemoryBlockMap.remove(taskId);
    +    }
    +    if (null != memoryBlocks) {
    +      Iterator<MemoryBlock> iterator = memoryBlocks.iterator();
    +      while (iterator.hasNext()) {
    +        allocator.free(iterator.next());
    +      }
    +    }
    +  }
    +
    +  public synchronized MemoryBlock allocateMemoryLazy(long taskId, long memoryRequested) {
    +    MemoryBlock allocate = allocator.allocate(memoryRequested);
    +    memoryUsed += allocate.size();
    +    Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId);
    +    if (null == listOfMemoryBlock) {
    +      listOfMemoryBlock = new HashSet<>();
    +      taskIdToMemoryBlockMap.put(taskId, listOfMemoryBlock);
    +    }
    +    listOfMemoryBlock.add(allocate);
    +    return allocate;
    +  }
    +
    +  /**
    +   * It tries to allocate memory of `size` bytes, keep retry until it allocates successfully.
    +   */
    +  public static MemoryBlock allocateMemoryWithRetry(long taskId, long size) throws MemoryException {
    +    MemoryBlock baseBlock = null;
    +    int tries = 0;
    +    while (tries < 100) {
    +      baseBlock = INSTANCE.allocateMemory(taskId, size);
    +      if (baseBlock == null) {
    +        try {
    +          Thread.sleep(50);
    +        } catch (InterruptedException e) {
    +          throw new MemoryException(e);
    +        }
    +      } else {
    +        break;
    +      }
    +      tries++;
    +    }
    +    if (baseBlock == null) {
    +      throw new MemoryException("Not enough memory");
    --- End diff --
    
    In caseof concurrent scenario this issue may come.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] carbondata issue #1185: [WIP]Fixed Concurrent table data loading unsafe memo...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1185
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/3121/



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] carbondata issue #1185: [WIP]Fixed Concurrent table data loading unsafe memo...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1185
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/3117/



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] carbondata issue #1185: [CARBONDATA-1318]Fixed Concurrent table data loading...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1185
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/3127/



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] carbondata pull request #1185: [WIP]Fixed Concurrent table data loading unsa...

Posted by kumarvishal09 <gi...@git.apache.org>.
Github user kumarvishal09 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1185#discussion_r128228749
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java ---
    @@ -75,76 +79,67 @@
     
       private MemoryAllocator allocator;
     
    -  private long minimumMemory;
    -
    -  // for debug purpose
    -  private Set<MemoryBlock> set = new HashSet<>();
    -
       private UnsafeMemoryManager(long totalMemory, MemoryAllocator allocator) {
         this.totalMemory = totalMemory;
         this.allocator = allocator;
    -    long numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
    -    long sortMemoryChunkSize = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
    -    sortMemoryChunkSize = sortMemoryChunkSize * 1024 * 1024;
    -    long totalWorkingMemoryForAllThreads = sortMemoryChunkSize * numberOfCores;
    -    if (totalWorkingMemoryForAllThreads >= totalMemory) {
    -      throw new RuntimeException("Working memory should be less than total memory configured, "
    -          + "so either reduce the loading threads or increase the memory size. "
    -          + "(Number of threads * number of threads) should be less than total unsafe memory");
    -    }
    -    minimumMemory = totalWorkingMemoryForAllThreads;
    -    LOGGER.info("Memory manager is created with size " + totalMemory + " with " + allocator
    -        + " and minimum reserve memory " + minimumMemory);
    +    LOGGER
    +        .info("Working Memory manager is created with size " + totalMemory + " with " + allocator);
       }
     
    -  private synchronized MemoryBlock allocateMemory(long memoryRequested) {
    +  private synchronized MemoryBlock allocateMemory(long taskId, long memoryRequested) {
         if (memoryUsed + memoryRequested <= totalMemory) {
           MemoryBlock allocate = allocator.allocate(memoryRequested);
           memoryUsed += allocate.size();
    -      if (LOGGER.isDebugEnabled()) {
    -        set.add(allocate);
    -        LOGGER.error("Memory block (" + allocate + ") is created with size "  + allocate.size() +
    -            ". Total memory used " + memoryUsed + "Bytes, left " + getAvailableMemory() + "Bytes");
    +      Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId);
    --- End diff --
    
    @ravipesala Debug Log or info log??


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] carbondata pull request #1185: [WIP]Fixed Concurrent table data loading unsa...

Posted by kumarvishal09 <gi...@git.apache.org>.
Github user kumarvishal09 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1185#discussion_r128228646
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java ---
    @@ -75,76 +79,67 @@
     
       private MemoryAllocator allocator;
     
    -  private long minimumMemory;
    -
    -  // for debug purpose
    -  private Set<MemoryBlock> set = new HashSet<>();
    -
       private UnsafeMemoryManager(long totalMemory, MemoryAllocator allocator) {
         this.totalMemory = totalMemory;
         this.allocator = allocator;
    -    long numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
    -    long sortMemoryChunkSize = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
    -    sortMemoryChunkSize = sortMemoryChunkSize * 1024 * 1024;
    -    long totalWorkingMemoryForAllThreads = sortMemoryChunkSize * numberOfCores;
    -    if (totalWorkingMemoryForAllThreads >= totalMemory) {
    -      throw new RuntimeException("Working memory should be less than total memory configured, "
    -          + "so either reduce the loading threads or increase the memory size. "
    -          + "(Number of threads * number of threads) should be less than total unsafe memory");
    -    }
    -    minimumMemory = totalWorkingMemoryForAllThreads;
    -    LOGGER.info("Memory manager is created with size " + totalMemory + " with " + allocator
    -        + " and minimum reserve memory " + minimumMemory);
    +    LOGGER
    +        .info("Working Memory manager is created with size " + totalMemory + " with " + allocator);
       }
     
    -  private synchronized MemoryBlock allocateMemory(long memoryRequested) {
    +  private synchronized MemoryBlock allocateMemory(long taskId, long memoryRequested) {
         if (memoryUsed + memoryRequested <= totalMemory) {
           MemoryBlock allocate = allocator.allocate(memoryRequested);
           memoryUsed += allocate.size();
    -      if (LOGGER.isDebugEnabled()) {
    -        set.add(allocate);
    -        LOGGER.error("Memory block (" + allocate + ") is created with size "  + allocate.size() +
    -            ". Total memory used " + memoryUsed + "Bytes, left " + getAvailableMemory() + "Bytes");
    +      Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId);
    --- End diff --
    
    ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] carbondata issue #1185: [CARBONDATA-1318]Fixed Concurrent table data loading...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1185
  
    Build Success with Spark 1.6, Please check CI http://144.76.159.231:8080/job/ApacheCarbonPRBuilder/559/



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] carbondata pull request #1185: [WIP]Fixed Concurrent table data loading unsa...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1185#discussion_r128233127
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java ---
    @@ -0,0 +1,216 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.core.memory;
    +
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +
    +/**
    + * Memory manager to keep track of
    + * all memory for storing the sorted data
    + */
    +public class UnsafeSortMemoryManager {
    +
    +  /**
    +   * logger
    +   */
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(UnsafeSortMemoryManager.class.getName());
    +
    +  /**
    +   * offheap is enabled
    +   */
    +  private static boolean offHeap = Boolean.parseBoolean(CarbonProperties.getInstance()
    +      .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
    +          CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT));
    +
    +  /**
    +   * map to keep taskid to memory blocks
    +   */
    +  private static Map<Long, Set<MemoryBlock>> taskIdToMemoryBlockMap;
    +
    +  /**
    +   * singleton instance
    +   */
    +  public static final UnsafeSortMemoryManager INSTANCE;
    +
    +  /**
    +   * total memory available for sort data storage
    +   */
    +  private long totalMemory;
    +
    +  /**
    +   * current memory used
    +   */
    +  private long memoryUsed;
    +
    +  /**
    +   * current memory allocator
    +   */
    +  private MemoryAllocator allocator;
    +
    +  static {
    +    long size;
    +    try {
    +      size = Long.parseLong(CarbonProperties.getInstance()
    +          .getProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB,
    +              CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT));
    +    } catch (Exception e) {
    +      size = Long.parseLong(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT);
    +      LOGGER.info("Wrong memory size given, " + "so setting default value to " + size);
    +    }
    +    if (size < 1024) {
    +      size = 1024;
    +      LOGGER.info("It is not recommended to keep unsafe memory size less than 1024MB, "
    +          + "so setting default value to " + size);
    +    }
    +
    +    long takenSize = size * 1024 * 1024;
    +    MemoryAllocator allocator;
    +    if (offHeap) {
    +      allocator = MemoryAllocator.UNSAFE;
    +    } else {
    +      long maxMemory = Runtime.getRuntime().maxMemory() * 60 / 100;
    +      if (takenSize > maxMemory) {
    +        takenSize = maxMemory;
    +      }
    +      allocator = MemoryAllocator.HEAP;
    +    }
    +    INSTANCE = new UnsafeSortMemoryManager(takenSize, allocator);
    +    taskIdToMemoryBlockMap = new HashMap<>();
    +  }
    +
    +  private UnsafeSortMemoryManager(long totalMemory, MemoryAllocator allocator) {
    +    this.totalMemory = totalMemory;
    +    this.allocator = allocator;
    +    LOGGER.info("Sort Memory manager is created with size " + totalMemory + " with " + allocator);
    +  }
    +
    +  /**
    +   * Below method will be used to check whether memory required is
    +   * available or not
    +   *
    +   * @param required
    +   * @return if memory available
    +   */
    +  public synchronized boolean isMemoryAvailable(long required) {
    +    return memoryUsed + required < totalMemory;
    +  }
    +
    +  /**
    +   * Below method will be used to allocate dummy memory
    +   * this will be used to allocate first and then used when u need
    +   *
    +   * @param size
    +   */
    +  public synchronized void allocateDummyMemory(long size) {
    +    memoryUsed += size;
    +  }
    +
    +  /**
    +   * Get total memory available
    +   *
    +   * @return amount of memory available
    +   */
    +  public long getUsableMemory() {
    +    return totalMemory;
    +  }
    +
    +  public synchronized void freeMemory(long taskId, MemoryBlock memoryBlock) {
    +    allocator.free(memoryBlock);
    +    taskIdToMemoryBlockMap.get(taskId).remove(memoryBlock);
    +    memoryUsed -= memoryBlock.size();
    +    memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
    +  }
    +
    +  public void freeMemoryAll(long taskId) {
    +    Set<MemoryBlock> memoryBlocks = null;
    +    synchronized (INSTANCE) {
    +      memoryBlocks = taskIdToMemoryBlockMap.remove(taskId);
    +    }
    +    if (null != memoryBlocks) {
    +      Iterator<MemoryBlock> iterator = memoryBlocks.iterator();
    +      while (iterator.hasNext()) {
    +        allocator.free(iterator.next());
    +      }
    +    }
    +  }
    +
    +  public synchronized MemoryBlock allocateMemoryLazy(long taskId, long memoryRequested) {
    +    MemoryBlock allocate = allocator.allocate(memoryRequested);
    +    memoryUsed += allocate.size();
    +    Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId);
    +    if (null == listOfMemoryBlock) {
    +      listOfMemoryBlock = new HashSet<>();
    +      taskIdToMemoryBlockMap.put(taskId, listOfMemoryBlock);
    +    }
    +    listOfMemoryBlock.add(allocate);
    +    return allocate;
    +  }
    +
    +  /**
    +   * It tries to allocate memory of `size` bytes, keep retry until it allocates successfully.
    +   */
    +  public static MemoryBlock allocateMemoryWithRetry(long taskId, long size) throws MemoryException {
    +    MemoryBlock baseBlock = null;
    +    int tries = 0;
    +    while (tries < 100) {
    +      baseBlock = INSTANCE.allocateMemory(taskId, size);
    +      if (baseBlock == null) {
    +        try {
    +          Thread.sleep(50);
    +        } catch (InterruptedException e) {
    +          throw new MemoryException(e);
    +        }
    +      } else {
    +        break;
    +      }
    +      tries++;
    +    }
    +    if (baseBlock == null) {
    +      throw new MemoryException("Not enough memory");
    --- End diff --
    
    Ideally this case should not come in any case


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] carbondata issue #1185: [CARBONDATA-1318]Fixed Concurrent table data loading...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1185
  
    Build Success with Spark 1.6, Please check CI http://144.76.159.231:8080/job/ApacheCarbonPRBuilder/536/



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] carbondata pull request #1185: [CARBONDATA-1318]Fixed Concurrent table data ...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1185#discussion_r128408050
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java ---
    @@ -67,24 +73,25 @@ public int get(int rowId) {
         return pointerBlock[rowId];
       }
     
    -  public void loadToUnsafe() throws MemoryException {
    -    pointerMemoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(pointerBlock.length * 4);
    -    for (int i = 0; i < pointerBlock.length; i++) {
    -      CarbonUnsafe.unsafe
    -          .putInt(pointerMemoryBlock.getBaseObject(), pointerMemoryBlock.getBaseOffset() + i * 4,
    -              pointerBlock[i]);
    +  public void loadToUnsafe() {
    +    try {
    +      pointerMemoryBlock =
    +          UnsafeSortMemoryManager.allocateMemoryWithRetry(this.taskId, pointerBlock.length * 4);
    +      for (int i = 0; i < pointerBlock.length; i++) {
    +        CarbonUnsafe.unsafe
    +            .putInt(pointerMemoryBlock.getBaseObject(), pointerMemoryBlock.getBaseOffset() + i * 4,
    +                pointerBlock[i]);
    +      }
    +      pointerBlock = null;
    +    } catch (MemoryException e) {
    +      LOGGER.error("Not enough memory for allocating pointer buffer, sorting in heap");
    --- End diff --
    
    change to `warn`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] carbondata pull request #1185: [WIP]Fixed Concurrent table data loading unsa...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1185#discussion_r128232969
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java ---
    @@ -0,0 +1,216 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.core.memory;
    +
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +
    +/**
    + * Memory manager to keep track of
    + * all memory for storing the sorted data
    + */
    +public class UnsafeSortMemoryManager {
    +
    +  /**
    +   * logger
    +   */
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(UnsafeSortMemoryManager.class.getName());
    +
    +  /**
    +   * offheap is enabled
    +   */
    +  private static boolean offHeap = Boolean.parseBoolean(CarbonProperties.getInstance()
    +      .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
    +          CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT));
    +
    +  /**
    +   * map to keep taskid to memory blocks
    +   */
    +  private static Map<Long, Set<MemoryBlock>> taskIdToMemoryBlockMap;
    +
    +  /**
    +   * singleton instance
    +   */
    +  public static final UnsafeSortMemoryManager INSTANCE;
    +
    +  /**
    +   * total memory available for sort data storage
    +   */
    +  private long totalMemory;
    +
    +  /**
    +   * current memory used
    +   */
    +  private long memoryUsed;
    +
    +  /**
    +   * current memory allocator
    +   */
    +  private MemoryAllocator allocator;
    +
    +  static {
    +    long size;
    +    try {
    +      size = Long.parseLong(CarbonProperties.getInstance()
    +          .getProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB,
    +              CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT));
    +    } catch (Exception e) {
    +      size = Long.parseLong(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT);
    +      LOGGER.info("Wrong memory size given, " + "so setting default value to " + size);
    +    }
    +    if (size < 1024) {
    +      size = 1024;
    +      LOGGER.info("It is not recommended to keep unsafe memory size less than 1024MB, "
    +          + "so setting default value to " + size);
    +    }
    +
    +    long takenSize = size * 1024 * 1024;
    +    MemoryAllocator allocator;
    +    if (offHeap) {
    +      allocator = MemoryAllocator.UNSAFE;
    +    } else {
    +      long maxMemory = Runtime.getRuntime().maxMemory() * 60 / 100;
    +      if (takenSize > maxMemory) {
    +        takenSize = maxMemory;
    +      }
    +      allocator = MemoryAllocator.HEAP;
    +    }
    +    INSTANCE = new UnsafeSortMemoryManager(takenSize, allocator);
    +    taskIdToMemoryBlockMap = new HashMap<>();
    +  }
    +
    +  private UnsafeSortMemoryManager(long totalMemory, MemoryAllocator allocator) {
    +    this.totalMemory = totalMemory;
    +    this.allocator = allocator;
    +    LOGGER.info("Sort Memory manager is created with size " + totalMemory + " with " + allocator);
    +  }
    +
    +  /**
    +   * Below method will be used to check whether memory required is
    +   * available or not
    +   *
    +   * @param required
    +   * @return if memory available
    +   */
    +  public synchronized boolean isMemoryAvailable(long required) {
    +    return memoryUsed + required < totalMemory;
    +  }
    +
    +  /**
    +   * Below method will be used to allocate dummy memory
    +   * this will be used to allocate first and then used when u need
    +   *
    +   * @param size
    +   */
    +  public synchronized void allocateDummyMemory(long size) {
    +    memoryUsed += size;
    +  }
    +
    +  /**
    +   * Get total memory available
    +   *
    +   * @return amount of memory available
    +   */
    +  public long getUsableMemory() {
    +    return totalMemory;
    +  }
    +
    +  public synchronized void freeMemory(long taskId, MemoryBlock memoryBlock) {
    +    allocator.free(memoryBlock);
    +    taskIdToMemoryBlockMap.get(taskId).remove(memoryBlock);
    +    memoryUsed -= memoryBlock.size();
    +    memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
    +  }
    +
    +  public void freeMemoryAll(long taskId) {
    +    Set<MemoryBlock> memoryBlocks = null;
    +    synchronized (INSTANCE) {
    +      memoryBlocks = taskIdToMemoryBlockMap.remove(taskId);
    +    }
    +    if (null != memoryBlocks) {
    +      Iterator<MemoryBlock> iterator = memoryBlocks.iterator();
    +      while (iterator.hasNext()) {
    +        allocator.free(iterator.next());
    +      }
    +    }
    +  }
    +
    +  public synchronized MemoryBlock allocateMemoryLazy(long taskId, long memoryRequested) {
    +    MemoryBlock allocate = allocator.allocate(memoryRequested);
    --- End diff --
    
    add log


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] carbondata pull request #1185: [WIP]Fixed Concurrent table data loading unsa...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1185#discussion_r128233334
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java ---
    @@ -0,0 +1,216 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.core.memory;
    +
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +
    +/**
    + * Memory manager to keep track of
    + * all memory for storing the sorted data
    + */
    +public class UnsafeSortMemoryManager {
    +
    +  /**
    +   * logger
    +   */
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(UnsafeSortMemoryManager.class.getName());
    +
    +  /**
    +   * offheap is enabled
    +   */
    +  private static boolean offHeap = Boolean.parseBoolean(CarbonProperties.getInstance()
    +      .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
    +          CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT));
    +
    +  /**
    +   * map to keep taskid to memory blocks
    +   */
    +  private static Map<Long, Set<MemoryBlock>> taskIdToMemoryBlockMap;
    +
    +  /**
    +   * singleton instance
    +   */
    +  public static final UnsafeSortMemoryManager INSTANCE;
    +
    +  /**
    +   * total memory available for sort data storage
    +   */
    +  private long totalMemory;
    +
    +  /**
    +   * current memory used
    +   */
    +  private long memoryUsed;
    +
    +  /**
    +   * current memory allocator
    +   */
    +  private MemoryAllocator allocator;
    +
    +  static {
    +    long size;
    +    try {
    +      size = Long.parseLong(CarbonProperties.getInstance()
    +          .getProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB,
    +              CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT));
    +    } catch (Exception e) {
    +      size = Long.parseLong(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT);
    +      LOGGER.info("Wrong memory size given, " + "so setting default value to " + size);
    +    }
    +    if (size < 1024) {
    +      size = 1024;
    +      LOGGER.info("It is not recommended to keep unsafe memory size less than 1024MB, "
    +          + "so setting default value to " + size);
    +    }
    +
    +    long takenSize = size * 1024 * 1024;
    +    MemoryAllocator allocator;
    +    if (offHeap) {
    +      allocator = MemoryAllocator.UNSAFE;
    +    } else {
    +      long maxMemory = Runtime.getRuntime().maxMemory() * 60 / 100;
    +      if (takenSize > maxMemory) {
    +        takenSize = maxMemory;
    +      }
    +      allocator = MemoryAllocator.HEAP;
    +    }
    +    INSTANCE = new UnsafeSortMemoryManager(takenSize, allocator);
    +    taskIdToMemoryBlockMap = new HashMap<>();
    +  }
    +
    +  private UnsafeSortMemoryManager(long totalMemory, MemoryAllocator allocator) {
    +    this.totalMemory = totalMemory;
    +    this.allocator = allocator;
    +    LOGGER.info("Sort Memory manager is created with size " + totalMemory + " with " + allocator);
    +  }
    +
    +  /**
    +   * Below method will be used to check whether memory required is
    +   * available or not
    +   *
    +   * @param required
    +   * @return if memory available
    +   */
    +  public synchronized boolean isMemoryAvailable(long required) {
    +    return memoryUsed + required < totalMemory;
    +  }
    +
    +  /**
    +   * Below method will be used to allocate dummy memory
    +   * this will be used to allocate first and then used when u need
    +   *
    +   * @param size
    +   */
    +  public synchronized void allocateDummyMemory(long size) {
    +    memoryUsed += size;
    +  }
    +
    +  /**
    +   * Get total memory available
    +   *
    +   * @return amount of memory available
    +   */
    +  public long getUsableMemory() {
    +    return totalMemory;
    +  }
    +
    +  public synchronized void freeMemory(long taskId, MemoryBlock memoryBlock) {
    +    allocator.free(memoryBlock);
    +    taskIdToMemoryBlockMap.get(taskId).remove(memoryBlock);
    +    memoryUsed -= memoryBlock.size();
    +    memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
    +  }
    +
    +  public void freeMemoryAll(long taskId) {
    +    Set<MemoryBlock> memoryBlocks = null;
    +    synchronized (INSTANCE) {
    +      memoryBlocks = taskIdToMemoryBlockMap.remove(taskId);
    +    }
    +    if (null != memoryBlocks) {
    +      Iterator<MemoryBlock> iterator = memoryBlocks.iterator();
    +      while (iterator.hasNext()) {
    +        allocator.free(iterator.next());
    +      }
    +    }
    +  }
    +
    +  public synchronized MemoryBlock allocateMemoryLazy(long taskId, long memoryRequested) {
    --- End diff --
    
    Can you add comment what is it used for


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] carbondata pull request #1185: [CARBONDATA-1318]Fixed Concurrent table data ...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1185#discussion_r128408599
  
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala ---
    @@ -454,6 +467,12 @@ class NewDataFrameLoaderRDD[K, V](
                 .printStatisticsInfo(model.getPartitionId)
             }
           }
    +      def clearMemory() {
    --- End diff --
    
    Move this to `CommonUtil.scala` and pass taskid. And remove duplicate occurances


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] carbondata pull request #1185: [WIP]Fixed Concurrent table data loading unsa...

Posted by kumarvishal09 <gi...@git.apache.org>.
Github user kumarvishal09 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1185#discussion_r128228565
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java ---
    @@ -0,0 +1,216 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.core.memory;
    +
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +
    +/**
    + * Memory manager to keep track of
    + * all memory for storing the sorted data
    + */
    +public class UnsafeSortMemoryManager {
    +
    +  /**
    +   * logger
    +   */
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(UnsafeSortMemoryManager.class.getName());
    +
    +  /**
    +   * offheap is enabled
    +   */
    +  private static boolean offHeap = Boolean.parseBoolean(CarbonProperties.getInstance()
    +      .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
    +          CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT));
    +
    +  /**
    +   * map to keep taskid to memory blocks
    +   */
    +  private static Map<Long, Set<MemoryBlock>> taskIdToMemoryBlockMap;
    +
    +  /**
    +   * singleton instance
    +   */
    +  public static final UnsafeSortMemoryManager INSTANCE;
    +
    +  /**
    +   * total memory available for sort data storage
    +   */
    +  private long totalMemory;
    +
    +  /**
    +   * current memory used
    +   */
    +  private long memoryUsed;
    +
    +  /**
    +   * current memory allocator
    +   */
    +  private MemoryAllocator allocator;
    +
    +  static {
    +    long size;
    +    try {
    +      size = Long.parseLong(CarbonProperties.getInstance()
    +          .getProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB,
    +              CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT));
    +    } catch (Exception e) {
    +      size = Long.parseLong(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT);
    +      LOGGER.info("Wrong memory size given, " + "so setting default value to " + size);
    +    }
    +    if (size < 1024) {
    +      size = 1024;
    +      LOGGER.info("It is not recommended to keep unsafe memory size less than 1024MB, "
    +          + "so setting default value to " + size);
    +    }
    +
    +    long takenSize = size * 1024 * 1024;
    +    MemoryAllocator allocator;
    +    if (offHeap) {
    +      allocator = MemoryAllocator.UNSAFE;
    +    } else {
    +      long maxMemory = Runtime.getRuntime().maxMemory() * 60 / 100;
    +      if (takenSize > maxMemory) {
    +        takenSize = maxMemory;
    +      }
    +      allocator = MemoryAllocator.HEAP;
    +    }
    +    INSTANCE = new UnsafeSortMemoryManager(takenSize, allocator);
    +    taskIdToMemoryBlockMap = new HashMap<>();
    +  }
    +
    +  private UnsafeSortMemoryManager(long totalMemory, MemoryAllocator allocator) {
    +    this.totalMemory = totalMemory;
    +    this.allocator = allocator;
    +    LOGGER.info("Sort Memory manager is created with size " + totalMemory + " with " + allocator);
    +  }
    +
    +  /**
    +   * Below method will be used to check whether memory required is
    +   * available or not
    +   *
    +   * @param required
    +   * @return if memory available
    +   */
    +  public synchronized boolean isMemoryAvailable(long required) {
    +    return memoryUsed + required < totalMemory;
    +  }
    +
    +  /**
    +   * Below method will be used to allocate dummy memory
    +   * this will be used to allocate first and then used when u need
    +   *
    +   * @param size
    +   */
    +  public synchronized void allocateDummyMemory(long size) {
    +    memoryUsed += size;
    +  }
    +
    +  /**
    +   * Get total memory available
    +   *
    +   * @return amount of memory available
    +   */
    +  public long getUsableMemory() {
    +    return totalMemory;
    +  }
    +
    +  public synchronized void freeMemory(long taskId, MemoryBlock memoryBlock) {
    --- End diff --
    
    ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] carbondata pull request #1185: [WIP]Fixed Concurrent table data loading unsa...

Posted by kumarvishal09 <gi...@git.apache.org>.
Github user kumarvishal09 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1185#discussion_r128228585
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java ---
    @@ -75,76 +79,67 @@
     
       private MemoryAllocator allocator;
     
    -  private long minimumMemory;
    -
    -  // for debug purpose
    -  private Set<MemoryBlock> set = new HashSet<>();
    -
       private UnsafeMemoryManager(long totalMemory, MemoryAllocator allocator) {
         this.totalMemory = totalMemory;
         this.allocator = allocator;
    -    long numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
    -    long sortMemoryChunkSize = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
    -    sortMemoryChunkSize = sortMemoryChunkSize * 1024 * 1024;
    -    long totalWorkingMemoryForAllThreads = sortMemoryChunkSize * numberOfCores;
    -    if (totalWorkingMemoryForAllThreads >= totalMemory) {
    -      throw new RuntimeException("Working memory should be less than total memory configured, "
    -          + "so either reduce the loading threads or increase the memory size. "
    -          + "(Number of threads * number of threads) should be less than total unsafe memory");
    -    }
    -    minimumMemory = totalWorkingMemoryForAllThreads;
    -    LOGGER.info("Memory manager is created with size " + totalMemory + " with " + allocator
    -        + " and minimum reserve memory " + minimumMemory);
    +    LOGGER
    +        .info("Working Memory manager is created with size " + totalMemory + " with " + allocator);
       }
     
    -  private synchronized MemoryBlock allocateMemory(long memoryRequested) {
    +  private synchronized MemoryBlock allocateMemory(long taskId, long memoryRequested) {
         if (memoryUsed + memoryRequested <= totalMemory) {
           MemoryBlock allocate = allocator.allocate(memoryRequested);
           memoryUsed += allocate.size();
    -      if (LOGGER.isDebugEnabled()) {
    -        set.add(allocate);
    -        LOGGER.error("Memory block (" + allocate + ") is created with size "  + allocate.size() +
    -            ". Total memory used " + memoryUsed + "Bytes, left " + getAvailableMemory() + "Bytes");
    +      Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId);
    +      if (null == listOfMemoryBlock) {
    +        listOfMemoryBlock = new HashSet<>();
    +        taskIdToMemoryBlockMap.put(taskId, listOfMemoryBlock);
           }
    +      listOfMemoryBlock.add(allocate);
           return allocate;
         }
         return null;
       }
     
    -  public synchronized void freeMemory(MemoryBlock memoryBlock) {
    +  public synchronized void freeMemory(long taskId,MemoryBlock memoryBlock) {
    +    taskIdToMemoryBlockMap.get(taskId).remove(memoryBlock);
    --- End diff --
    
    ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] carbondata issue #1185: [CARBONDATA-1318]Fixed Concurrent table data loading...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1185
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/3152/



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] carbondata pull request #1185: [CARBONDATA-1318]Fixed Concurrent table data ...

Posted by kumarvishal09 <gi...@git.apache.org>.
Github user kumarvishal09 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1185#discussion_r128439933
  
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala ---
    @@ -454,6 +467,12 @@ class NewDataFrameLoaderRDD[K, V](
                 .printStatisticsInfo(model.getPartitionId)
             }
           }
    +      def clearMemory() {
    --- End diff --
    
    ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] carbondata pull request #1185: [WIP]Fixed Concurrent table data loading unsa...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1185#discussion_r128227115
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java ---
    @@ -75,76 +79,67 @@
     
       private MemoryAllocator allocator;
     
    -  private long minimumMemory;
    -
    -  // for debug purpose
    -  private Set<MemoryBlock> set = new HashSet<>();
    -
       private UnsafeMemoryManager(long totalMemory, MemoryAllocator allocator) {
         this.totalMemory = totalMemory;
         this.allocator = allocator;
    -    long numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
    -    long sortMemoryChunkSize = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
    -    sortMemoryChunkSize = sortMemoryChunkSize * 1024 * 1024;
    -    long totalWorkingMemoryForAllThreads = sortMemoryChunkSize * numberOfCores;
    -    if (totalWorkingMemoryForAllThreads >= totalMemory) {
    -      throw new RuntimeException("Working memory should be less than total memory configured, "
    -          + "so either reduce the loading threads or increase the memory size. "
    -          + "(Number of threads * number of threads) should be less than total unsafe memory");
    -    }
    -    minimumMemory = totalWorkingMemoryForAllThreads;
    -    LOGGER.info("Memory manager is created with size " + totalMemory + " with " + allocator
    -        + " and minimum reserve memory " + minimumMemory);
    +    LOGGER
    +        .info("Working Memory manager is created with size " + totalMemory + " with " + allocator);
       }
     
    -  private synchronized MemoryBlock allocateMemory(long memoryRequested) {
    +  private synchronized MemoryBlock allocateMemory(long taskId, long memoryRequested) {
         if (memoryUsed + memoryRequested <= totalMemory) {
           MemoryBlock allocate = allocator.allocate(memoryRequested);
           memoryUsed += allocate.size();
    -      if (LOGGER.isDebugEnabled()) {
    -        set.add(allocate);
    -        LOGGER.error("Memory block (" + allocate + ") is created with size "  + allocate.size() +
    -            ". Total memory used " + memoryUsed + "Bytes, left " + getAvailableMemory() + "Bytes");
    +      Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId);
    --- End diff --
    
    Better add log to know how much is allocated so far and how much is left


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] carbondata issue #1185: [WIP]Fixed Concurrent table data loading unsafe memo...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1185
  
    Build Success with Spark 1.6, Please check CI http://144.76.159.231:8080/job/ApacheCarbonPRBuilder/530/



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] carbondata pull request #1185: [CARBONDATA-1318]Fixed Concurrent table data ...

Posted by kumarvishal09 <gi...@git.apache.org>.
Github user kumarvishal09 closed the pull request at:

    https://github.com/apache/carbondata/pull/1185


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---