You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by kumarvishal09 <gi...@git.apache.org> on 2017/07/19 14:36:03 UTC

[GitHub] carbondata pull request #1185: [CARBONDATA-1318]Fixed Concurrent table data ...

Github user kumarvishal09 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1185#discussion_r128264917
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java ---
    @@ -0,0 +1,216 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.core.memory;
    +
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +
    +/**
    + * Memory manager to keep track of
    + * all memory for storing the sorted data
    + */
    +public class UnsafeSortMemoryManager {
    +
    +  /**
    +   * logger
    +   */
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(UnsafeSortMemoryManager.class.getName());
    +
    +  /**
    +   * offheap is enabled
    +   */
    +  private static boolean offHeap = Boolean.parseBoolean(CarbonProperties.getInstance()
    +      .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
    +          CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT));
    +
    +  /**
    +   * map to keep taskid to memory blocks
    +   */
    +  private static Map<Long, Set<MemoryBlock>> taskIdToMemoryBlockMap;
    +
    +  /**
    +   * singleton instance
    +   */
    +  public static final UnsafeSortMemoryManager INSTANCE;
    +
    +  /**
    +   * total memory available for sort data storage
    +   */
    +  private long totalMemory;
    +
    +  /**
    +   * current memory used
    +   */
    +  private long memoryUsed;
    +
    +  /**
    +   * current memory allocator
    +   */
    +  private MemoryAllocator allocator;
    +
    +  static {
    +    long size;
    +    try {
    +      size = Long.parseLong(CarbonProperties.getInstance()
    +          .getProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB,
    +              CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT));
    +    } catch (Exception e) {
    +      size = Long.parseLong(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT);
    +      LOGGER.info("Wrong memory size given, " + "so setting default value to " + size);
    +    }
    +    if (size < 1024) {
    +      size = 1024;
    +      LOGGER.info("It is not recommended to keep unsafe memory size less than 1024MB, "
    +          + "so setting default value to " + size);
    +    }
    +
    +    long takenSize = size * 1024 * 1024;
    +    MemoryAllocator allocator;
    +    if (offHeap) {
    +      allocator = MemoryAllocator.UNSAFE;
    +    } else {
    +      long maxMemory = Runtime.getRuntime().maxMemory() * 60 / 100;
    +      if (takenSize > maxMemory) {
    +        takenSize = maxMemory;
    +      }
    +      allocator = MemoryAllocator.HEAP;
    +    }
    +    INSTANCE = new UnsafeSortMemoryManager(takenSize, allocator);
    +    taskIdToMemoryBlockMap = new HashMap<>();
    +  }
    +
    +  private UnsafeSortMemoryManager(long totalMemory, MemoryAllocator allocator) {
    +    this.totalMemory = totalMemory;
    +    this.allocator = allocator;
    +    LOGGER.info("Sort Memory manager is created with size " + totalMemory + " with " + allocator);
    +  }
    +
    +  /**
    +   * Below method will be used to check whether memory required is
    +   * available or not
    +   *
    +   * @param required
    +   * @return if memory available
    +   */
    +  public synchronized boolean isMemoryAvailable(long required) {
    +    return memoryUsed + required < totalMemory;
    +  }
    +
    +  /**
    +   * Below method will be used to allocate dummy memory
    +   * this will be used to allocate first and then used when u need
    +   *
    +   * @param size
    +   */
    +  public synchronized void allocateDummyMemory(long size) {
    +    memoryUsed += size;
    +  }
    +
    +  /**
    +   * Get total memory available
    +   *
    +   * @return amount of memory available
    +   */
    +  public long getUsableMemory() {
    +    return totalMemory;
    +  }
    +
    +  public synchronized void freeMemory(long taskId, MemoryBlock memoryBlock) {
    +    allocator.free(memoryBlock);
    +    taskIdToMemoryBlockMap.get(taskId).remove(memoryBlock);
    +    memoryUsed -= memoryBlock.size();
    +    memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
    +  }
    +
    +  public void freeMemoryAll(long taskId) {
    +    Set<MemoryBlock> memoryBlocks = null;
    +    synchronized (INSTANCE) {
    +      memoryBlocks = taskIdToMemoryBlockMap.remove(taskId);
    +    }
    +    if (null != memoryBlocks) {
    +      Iterator<MemoryBlock> iterator = memoryBlocks.iterator();
    +      while (iterator.hasNext()) {
    +        allocator.free(iterator.next());
    +      }
    +    }
    +  }
    +
    +  public synchronized MemoryBlock allocateMemoryLazy(long taskId, long memoryRequested) {
    +    MemoryBlock allocate = allocator.allocate(memoryRequested);
    +    memoryUsed += allocate.size();
    +    Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId);
    +    if (null == listOfMemoryBlock) {
    +      listOfMemoryBlock = new HashSet<>();
    +      taskIdToMemoryBlockMap.put(taskId, listOfMemoryBlock);
    +    }
    +    listOfMemoryBlock.add(allocate);
    +    return allocate;
    +  }
    +
    +  /**
    +   * It tries to allocate memory of `size` bytes, keep retry until it allocates successfully.
    +   */
    +  public static MemoryBlock allocateMemoryWithRetry(long taskId, long size) throws MemoryException {
    +    MemoryBlock baseBlock = null;
    +    int tries = 0;
    +    while (tries < 100) {
    +      baseBlock = INSTANCE.allocateMemory(taskId, size);
    +      if (baseBlock == null) {
    +        try {
    +          Thread.sleep(50);
    +        } catch (InterruptedException e) {
    +          throw new MemoryException(e);
    +        }
    +      } else {
    +        break;
    +      }
    +      tries++;
    +    }
    +    if (baseBlock == null) {
    +      throw new MemoryException("Not enough memory");
    +    }
    +    return baseBlock;
    +  }
    +
    +  private synchronized MemoryBlock allocateMemory(long taskId, long memoryRequested) {
    +    if (memoryUsed + memoryRequested <= totalMemory) {
    --- End diff --
    
    ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---