You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@nemo.apache.org by GitBox <gi...@apache.org> on 2020/07/08 17:36:54 UTC

[GitHub] [incubator-nemo] codinggosu opened a new pull request #298: Memory manager

codinggosu opened a new pull request #298:
URL: https://github.com/apache/incubator-nemo/pull/298


   JIRA: [NEMO-455: MemoryManager to keep track of remaining storage, execution memory](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-455)
   
   **Major changes:**
   - Adds MemoryManager that splits the memory in executors to separate pools, execution and storage
   - Make BlockOutputWriter write to a temporary vector before data is committed
   
   **Minor changes to note:**
   - 
   
   **Tests for the changes:**
   - Tests for SizeEstimator completed, SizeTrackingVector, MemoryManager still needs to be done
   
   **Other comments:**
   - this is a WIP, will not pass tests until pr #297 has been merged, as well as the addition of other elements
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] codinggosu commented on a change in pull request #298: [NEMO-455] (WIP) MemoryManager to keep track of remaining storage, execution memory

Posted by GitBox <gi...@apache.org>.
codinggosu commented on a change in pull request #298:
URL: https://github.com/apache/incubator-nemo/pull/298#discussion_r456470441



##########
File path: dongjooals.sh
##########
@@ -0,0 +1,31 @@
+#!/usr/bin/env bash

Review comment:
       I will remove all .sh files that I created before the final change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] jeongyooneo closed pull request #298: [NEMO-455] (WIP) MemoryManager to keep track of remaining storage, execution memory

Posted by GitBox <gi...@apache.org>.
jeongyooneo closed pull request #298:
URL: https://github.com/apache/incubator-nemo/pull/298


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] jeongyooneo closed pull request #298: [NEMO-455] (WIP) MemoryManager to keep track of remaining storage, execution memory

Posted by GitBox <gi...@apache.org>.
jeongyooneo closed pull request #298:
URL: https://github.com/apache/incubator-nemo/pull/298


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] codinggosu commented on a change in pull request #298: Memory manager

Posted by GitBox <gi...@apache.org>.
codinggosu commented on a change in pull request #298:
URL: https://github.com/apache/incubator-nemo/pull/298#discussion_r451715371



##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java
##########
@@ -102,9 +107,17 @@ private Executor(@Parameter(JobConf.ExecutorId.class) final String executorId,
     this.intermediateDataIOFactory = intermediateDataIOFactory;
     this.broadcastManagerWorker = broadcastManagerWorker;
     this.metricMessageSender = metricMessageSender;
+    // dongjoo: one memory manager per executor
+//    this.memoryManager = new MemoryManager();

Review comment:
       these comments will be deleted later
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] jeongyooneo commented on a change in pull request #298: [NEMO-455] (WIP) MemoryManager to keep track of remaining storage, execution memory

Posted by GitBox <gi...@apache.org>.
jeongyooneo commented on a change in pull request #298:
URL: https://github.com/apache/incubator-nemo/pull/298#discussion_r456371148



##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java
##########
@@ -88,6 +86,10 @@
   private final Map<String, AtomicInteger> blockToRemainingRead;
   private final BlockTransferThrottler blockTransferThrottler;
 
+  //dongjoo
+  private final MemoryManager memoryManager;
+  // a map of memoryStore blocks to newly created spilled blocks
+  private final HashMap<String, Block> spilledBlocks;

Review comment:
       What is keys in this map? It seems like the keys are not `memoryStore blocks`.

##########
File path: common/src/main/java/org/apache/nemo/common/ir/executionproperty/ExecutionPropertyMap.java
##########
@@ -48,6 +48,15 @@
   private final Map<Class<? extends ExecutionProperty>, T> properties = new HashMap<>();
   private final Set<Class<? extends ExecutionProperty>> finalizedProperties = new HashSet<>();
 
+  // only change an execution property from a memoryStore to a local file store

Review comment:
       Changing ExecutionProperty is a purposeful action in Nemo policy layer. Changing it during job execution may require rewriting PhysicalPlan. 

##########
File path: runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/RuntimeEdge.java
##########
@@ -52,6 +54,16 @@ public RuntimeEdge(final String runtimeEdgeId,
     this.executionProperties = executionProperties;
   }
 
+  /**
+   * change the data store property of an edge, used for caching when there is not enough memory.
+   * (ex: MEMORY_STORE -> LOCAL_FILE_STORE)
+   * @param value the new DataStoreProperty.Value
+   */
+  public void changeDataStoreProperty(final DataStoreProperty.Value value) {

Review comment:
       Ditto. Shouldn't change ExecutionProperty values during runtime or should stop execution -> rewrite Physical -> resume execution

##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java
##########
@@ -203,11 +224,17 @@ public Block createBlock(final String blockId,
           "Block " + blockIdWildcard + " location unknown: "
             + "The block state is " + blockLocationInfoMsg.getState()));
       }
+      LOG.info("readblock, msg. get block id {}", blockLocationInfoMsg.getBlockId());
 
       // This is the executor id that we wanted to know
       final String blockId = blockLocationInfoMsg.getBlockId();
       final String targetExecutorId = blockLocationInfoMsg.getOwnerExecutorId();
-      final DataStoreProperty.Value blockStore = edgeProperties.get(DataStoreProperty.class).get();
+      DataStoreProperty.Value blockStore = edgeProperties.get(DataStoreProperty.class).get();
+      if (this.spilledBlocks.containsKey(blockId)) {

Review comment:
       Ditto. Shouldn't change ExecutionProperty values during runtime or should stop execution -> rewrite Physical -> resume execution

##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/IntermediateDataIOFactory.java
##########
@@ -60,7 +71,8 @@ public OutputWriter createWriter(final String srcTaskId,
       return new PipeOutputWriter(srcTaskId, runtimeEdge, pipeManagerWorker);
     } else {
       final StageEdge stageEdge = (StageEdge) runtimeEdge;
-      return new BlockOutputWriter(srcTaskId, stageEdge.getDstIRVertex(), runtimeEdge, blockManagerWorker);
+      return new BlockOutputWriter(
+        srcTaskId, stageEdge.getDstIRVertex(), runtimeEdge, blockManagerWorker, memoryManager);

Review comment:
       I think as BlockManagerWorker has MemoryManager, we don't need to pass memoryManager when creating BlockOutputWriter(maybe using `getMemoryManger()`?). If so, we don't need to import MemoryManager here.

##########
File path: dongjooals.sh
##########
@@ -0,0 +1,31 @@
+#!/usr/bin/env bash

Review comment:
       Please remove this file or add .sh to .gitignore file

##########
File path: make.sh
##########
@@ -0,0 +1,20 @@
+#!/usr/bin/env bash

Review comment:
       Same here

##########
File path: runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/RuntimeEdge.java
##########
@@ -34,7 +35,8 @@
  * @param <V> the vertex type.
  */
 public class RuntimeEdge<V extends Vertex> extends Edge<V> {
-  private final ExecutionPropertyMap<EdgeExecutionProperty> executionProperties;
+  /// drop final in executionProperties
+  private ExecutionPropertyMap<EdgeExecutionProperty> executionProperties;

Review comment:
       ExecutionPropertyMap should be final

##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java
##########
@@ -74,21 +83,51 @@
       .orElseThrow(() -> new RuntimeException("No data store property on the edge"));
     blockToWrite = blockManagerWorker.createBlock(
       RuntimeIdManager.generateBlockId(runtimeEdge.getId(), srcTaskId), blockStoreValue);
+    potentialSpilledBlocktoWrite = blockManagerWorker.createBlock(

Review comment:
       Why an empty Block?
   Please check the next comment.

##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java
##########
@@ -74,21 +83,51 @@
       .orElseThrow(() -> new RuntimeException("No data store property on the edge"));
     blockToWrite = blockManagerWorker.createBlock(
       RuntimeIdManager.generateBlockId(runtimeEdge.getId(), srcTaskId), blockStoreValue);
+    potentialSpilledBlocktoWrite = blockManagerWorker.createBlock(
+      RuntimeIdManager.generateBlockId(runtimeEdge.getId(), srcTaskId), DataStoreProperty.Value.LOCAL_FILE_STORE);
+    LOG.info("dongjoo, BlockOutputWriter constructor, stageEdge {}, runtimeEdge {}, blockToWrite {}",
+      stageEdge, runtimeEdge, blockToWrite);
     final Optional<DuplicateEdgeGroupPropertyValue> duplicateDataProperty =
       runtimeEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
     nonDummyBlock = !duplicateDataProperty.isPresent()
       || duplicateDataProperty.get().getRepresentativeEdgeId().equals(runtimeEdge.getId())
       || duplicateDataProperty.get().getGroupSize() <= 1;
+
+    //dongjoo
+    this.memoryManager = memoryManager;
+    this.sizeTrackingVector = new SizeTrackingVector();
+
   }
 
+
   @Override
   public void write(final Object element) {
+//    LOG.info("type of partitioner, key: {} and type of block {}", partitioner.partition(element), blockStoreValue);
     if (nonDummyBlock) {
-      blockToWrite.write(partitioner.partition(element), element);
-
+      // logging
+//      LOG.info("BlockOutPutWriter write, blockid {},blocktoWrite {}, blockStoreValue {}",
+//        blockToWrite.getId(), blockToWrite, blockStoreValue);
+//      LOG.info("written element {}", element);
+      // loggin common end
+      // if blockStore is caching to Memory, write to temporary SizeTrackingVector to ensure OOM doesn't occur,
+      // actual "writing" will occur in { close }
+      if (blockStoreValue == DataStoreProperty.Value.MEMORY_STORE
+        || blockStoreValue == DataStoreProperty.Value.SERIALIZED_MEMORY_STORE) {
+//        LOG.info("only for memory store append to STV");
+        sizeTrackingVector.append(element);

Review comment:
       How about maintaining a map of `BlockId`(or key of a Partition?)-`sizeTrackingVector` instead of reusing a single `sizeTrackingVector` for arbitrary Blocks?

##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java
##########
@@ -74,21 +83,51 @@
       .orElseThrow(() -> new RuntimeException("No data store property on the edge"));
     blockToWrite = blockManagerWorker.createBlock(
       RuntimeIdManager.generateBlockId(runtimeEdge.getId(), srcTaskId), blockStoreValue);
+    potentialSpilledBlocktoWrite = blockManagerWorker.createBlock(
+      RuntimeIdManager.generateBlockId(runtimeEdge.getId(), srcTaskId), DataStoreProperty.Value.LOCAL_FILE_STORE);
+    LOG.info("dongjoo, BlockOutputWriter constructor, stageEdge {}, runtimeEdge {}, blockToWrite {}",
+      stageEdge, runtimeEdge, blockToWrite);
     final Optional<DuplicateEdgeGroupPropertyValue> duplicateDataProperty =
       runtimeEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
     nonDummyBlock = !duplicateDataProperty.isPresent()
       || duplicateDataProperty.get().getRepresentativeEdgeId().equals(runtimeEdge.getId())
       || duplicateDataProperty.get().getGroupSize() <= 1;
+
+    //dongjoo
+    this.memoryManager = memoryManager;
+    this.sizeTrackingVector = new SizeTrackingVector();
+
   }
 
+
   @Override
   public void write(final Object element) {
+//    LOG.info("type of partitioner, key: {} and type of block {}", partitioner.partition(element), blockStoreValue);
     if (nonDummyBlock) {
-      blockToWrite.write(partitioner.partition(element), element);
-
+      // logging
+//      LOG.info("BlockOutPutWriter write, blockid {},blocktoWrite {}, blockStoreValue {}",
+//        blockToWrite.getId(), blockToWrite, blockStoreValue);
+//      LOG.info("written element {}", element);
+      // loggin common end
+      // if blockStore is caching to Memory, write to temporary SizeTrackingVector to ensure OOM doesn't occur,
+      // actual "writing" will occur in { close }
+      if (blockStoreValue == DataStoreProperty.Value.MEMORY_STORE
+        || blockStoreValue == DataStoreProperty.Value.SERIALIZED_MEMORY_STORE) {
+//        LOG.info("only for memory store append to STV");
+        sizeTrackingVector.append(element);
+//        LOG.info("sizeTrackingVector size: {}", sizeTrackingVector.estimateSize());
+//        memoryManager.acquireStorageMemory(blockToWrite.getId(), SizeEstimator.estimate(element));
+      } else {
+        // original logic
+        // dongjoo: partioner.partition returns key of the partition
+//        LOG.info("not a memory store block, just write");
+        blockToWrite.write(partitioner.partition(element), element);
+      }
+      // other logic, not write or append to SizeTrackingVector, common operation?
       final DedicatedKeyPerElement dedicatedKeyPerElement =
         partitioner.getClass().getAnnotation(DedicatedKeyPerElement.class);
       if (dedicatedKeyPerElement != null) {
+        LOG.info("COMMITPARTITIONS CALLED BECAUSE OF DEDICATED KEY");
         blockToWrite.commitPartitions();

Review comment:
       In DedicatedKeyPerElement case, for some reason size tracking is not enabled. Is this intended? And if so, why?

##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/SizeEstimator.java
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.data;
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A class to estimate the size of Objects.
+ * necessary for implementing caching and determining memory status
+ */
+public final class SizeEstimator {
+  private static final Logger LOG = LoggerFactory.getLogger(SizeEstimator.class.getName());
+  private SizeEstimator() {
+    // not called
+  }
+
+//  static {
+//    initialize();
+//  }
+
+
+  // Sizes of primitive types
+  private static final int BOOLEAN_SIZE = 1;
+  private static final int CHAR_SIZE = 2;
+  private static final int SHORT_SIZE = 2;
+  private static final int INT_SIZE = 4;
+  private static final int LONG_SIZE = 8;
+  private static final int FLOAT_SIZE = 4;
+  private static final int BYTE_SIZE = 1;
+  private static final int DOUBLE_SIZE = 8;
+  // architecture dependent sizes
+  private static final int ALIGN_SIZE = 8;
+  private static boolean is64Bit = true;
+  private static int objectSize = 16; // 12 bytes with 8 byte offset
+  private static int pointerSize = 8;
+  private static boolean isCompressedOops = false;
+
+
+  // cache of classInfos
+  private static WeakHashMap<Class<?>, ClassInfo> classInfos = new WeakHashMap<Class<?>, ClassInfo>();
+
+
+  public static void initialize() {
+    String architecture = System.getProperty("os.arch");
+    is64Bit = architecture.contains("64") || architecture.contains("s390x");
+    objectSize = is64Bit ? 16 : 8;
+    pointerSize = is64Bit ? 8 : 4;
+    classInfos.clear();
+    LOG.info("Size Estimator initizlied {} ", System.getProperty("sun.arch.data.model"));
+    isCompressedOops = getIsCompressedOops();
+  }
+
+  public static boolean getIsCompressedOops() {
+    final String architecture = System.getProperty("sun.arch.data.model");
+    Class<?> beanClazz = Long.class;
+    try {
+      beanClazz = Class.forName("com.sun.management.HotSpotDiagnosticMXBean");
+    } catch (Exception e) {
+      LOG.info("some exception");
+    }
+    LOG.info("Architecture {}", architecture);
+    LOG.info("beansClazz {}", beanClazz);
+    return true;
+
+  }
+
+  public static long estimate(final Object obj) {
+    return estimate(obj, new IdentityHashMap());
+  }
+
+  public static long estimate(final Object obj, final IdentityHashMap map) {
+//    LOG.info("SizeEstimator Estimate called");
+    SearchState state = new SearchState(map);
+    state.enqueue(obj);
+    while (!state.isFinished()) {
+      visitSingleObject(state.dequeue(), state);
+    }
+    return state.size;
+  }
+
+
+  // Estimate the size of arrays larger than ARRAY_SIZE_FOR_SAMPLING by sampling.
+  private static final int ARRAY_SIZE_FOR_SAMPLING = 400;
+  private static final int ARRAY_SAMPLE_SIZE = 100; // should be lower than ARRAY_SIZE_FOR_SAMPLING
+
+  private static void visitArray(final Object array, final Class<?> cls, final SearchState state) {
+    Class<?> elementClass = cls.getComponentType();
+    long length = Array.getLength(array);
+    long arrSize = alignSize(objectSize + INT_SIZE); // 24 with 16 byte object size after alignment
+    if (elementClass.isPrimitive()) {
+      arrSize += alignSize(length * getPrimitiveSize(elementClass));
+      state.size += arrSize;
+    } else {
+      arrSize += alignSize(length * pointerSize);
+      state.size += arrSize;
+      if (length <= ARRAY_SIZE_FOR_SAMPLING) {
+        int arrayIndex = 0;
+        while (arrayIndex < length) {
+          Object selected = Array.get(array, arrayIndex);
+          state.enqueue(selected);
+          arrayIndex += 1;
+        }
+      } else {
+        // get the size of a large array by sampling
+        double sampledSize = 0.0;
+        Random rand = new Random(42);
+        Set<Integer> chosen = new HashSet<Integer>(ARRAY_SAMPLE_SIZE);
+        int index = 0;
+        for (int i = 0; i < ARRAY_SAMPLE_SIZE; i++) {
+          index = rand.nextInt((int) length);
+          while (chosen.contains(index)) {
+            index = rand.nextInt((int) length);
+          }
+          chosen.add(index);
+          Object element = Array.get(array, index); // randomly sampled element
+          sampledSize += SizeEstimator.estimate(element, state.visited);
+        }
+        state.size += Double.valueOf(((length / (ARRAY_SAMPLE_SIZE * 1.0)) * sampledSize)).longValue();
+      }
+    }
+  }
+
+
+  private static void visitSingleObject(final Object obj, final SearchState state) {
+    Class<?> cls = obj.getClass();
+    if (cls.isArray()) {
+      visitArray(obj, cls, state);
+    } else if (!cls.getName().startsWith("java.lang.reflect")
+      && !(obj instanceof ClassLoader || obj instanceof  Class)) {
+      ClassInfo classInfo = getClassInfo(cls);
+      state.size += classInfo.shellSize;
+      for (Field field : classInfo.pointerFields) {
+        try {
+          state.enqueue(field.get(obj));
+        } catch (IllegalArgumentException e) {
+          // pass fields that can't be accessed with field.get(obj)
+          continue;
+        } catch (IllegalAccessException e) {
+          // pass fields that can't be accessed with field.get(obj)
+          continue;
+        }
+      }
+    }
+  }
+
+  /**
+   * A class to represent what state the search is currently in.
+   */
+  private static class SearchState {
+    private IdentityHashMap visited;
+    private long size = 0L;
+    private Stack<Object> stack;
+
+    SearchState(final IdentityHashMap map) {
+      this.visited = map;
+      this.stack = new Stack<>();
+    }
+
+
+    void enqueue(final Object obj) {
+      if (obj != null && !visited.containsKey(obj)) {
+        visited.put(obj, null);
+        stack.add(obj);
+      }
+    }
+
+    boolean isFinished() {
+      return stack.isEmpty();
+    }
+
+    Object dequeue() {
+      return stack.pop();
+    }
+  } // SearchState
+
+  private static long getPrimitiveSize(final Class<?> cls) {
+    if (cls == byte.class) {
+      return BYTE_SIZE;
+    } else if (cls == boolean.class) {
+      return BOOLEAN_SIZE;
+    } else if (cls == char.class) {
+      return CHAR_SIZE;
+    } else if (cls == short.class) {
+      return SHORT_SIZE;
+    } else if (cls == int.class) {
+      return INT_SIZE;
+    } else if (cls == long.class) {
+      return LONG_SIZE;
+    } else if (cls == float.class) {
+      return FLOAT_SIZE;
+    } else if (cls == double.class) {
+      return DOUBLE_SIZE;
+    } else {
+      throw new IllegalArgumentException(
+        "Non-primitive class " + cls + " passed to primitiveSize()");
+    }
+  }
+
+  /**
+   * A class for ClassInfo, which contains the class overhead size and members the class has.
+   */
+  private static class ClassInfo {
+    ClassInfo(final long shellSize, final List pointerFields) {
+      this.shellSize = shellSize;
+      this.pointerFields = pointerFields;
+    }
+    private long shellSize;
+    private List<Field> pointerFields;
+  }
+
+  /**
+   * Get the cached info or compute the ClassInfo for a given class.
+   * @return the computed classInfo
+   */
+  private static ClassInfo getClassInfo(final Class<?> cls) {
+    // base case
+    if (cls == Object.class) {
+      ClassInfo info = new ClassInfo(8L, new ArrayList<Field>());
+      classInfos.put(cls, info);
+      return info;
+    }
+    ClassInfo info = classInfos.get(cls);
+    if (info != null) {
+      return info;
+    }
+    Class<?> superClass = cls.getSuperclass();
+    ClassInfo parent = getClassInfo(superClass);
+    long shellSize = parent.shellSize;
+    List<Field> pointerFields = parent.pointerFields;
+
+    // iterate through the fields of this class and gather information.
+    for (Field field : cls.getDeclaredFields()) {
+      if (!java.lang.reflect.Modifier.isStatic(field.getModifiers())) {
+        Class<?> fieldClass = field.getType();
+        // handle primitive members
+        if (fieldClass.isPrimitive()) {
+          shellSize += getPrimitiveSize(fieldClass);
+        } else { // handle non-primitive references
+            field.setAccessible(true); // Enable future get()'s on this field
+            shellSize += pointerSize;
+            pointerFields.add(field);
+        }
+      }
+    }
+    // cache the newly computed ClassInfo
+    shellSize = alignSize(shellSize);
+    ClassInfo newInfo = new ClassInfo(shellSize, pointerFields);
+    classInfos.put(cls, newInfo);
+    return newInfo;
+  }
+
+  private static long alignSize(final long size) {
+    return (size + ALIGN_SIZE - 1) & ~(ALIGN_SIZE - 1);
+  }
+}
+
+

Review comment:
       Delete the empty lines below

##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java
##########
@@ -367,27 +396,47 @@ public void removeBlock(final String blockId,
    * @throws InvalidProtocolBufferException from errors during parsing context descriptor
    */
   public void onOutputContext(final ByteOutputContext outputContext) throws InvalidProtocolBufferException {
+    LOG.info("dongjoo: BMW onOutputContext, outputcontext {}", outputContext);
     final ControlMessage.BlockTransferContextDescriptor descriptor =
       ControlMessage.BlockTransferContextDescriptor.PARSER.parseFrom(outputContext.getContextDescriptor());
     final DataStoreProperty.Value blockStore = convertBlockStore(descriptor.getBlockStore());
     final String blockId = descriptor.getBlockId();
     final KeyRange keyRange = SerializationUtils.deserialize(descriptor.getKeyRange().toByteArray());
-
+    LOG.info("Dongjoo, descriptor, blockstore, blockid keyrange finalized, before run");
     backgroundExecutorService.submit(new Runnable() {
       @Override
       public void run() {
         try {
           final Optional<Block> optionalBlock = getBlockStore(blockStore).readBlock(blockId);
+          final Optional<Block> optionalSpilledBlock = getSpilledBlock(blockId);

Review comment:
       Here and all below, I couldn't really understood the need for `optionalSpilledBlock`. Is it disjoint from `optionalBlock`? 
   Is a Block moves from `optionalBlock` to `optionalSpilledBlock` when it is spilled?
   Is there any consistency issues?(looks like there are no locks) e.g. a Block was in `optionalBlock` of Executor 1 -> Executor 2 read it -> Executor 1 moved the block to `optionalSpilledBlock`?

##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java
##########
@@ -100,12 +139,77 @@ public void writeWatermark(final Watermark watermark) {
   }
 
   /**
-   * Notifies that all writes for a block is end.
-   * Further write about a committed block will throw an exception.
+   * Notifies that all writes for a block have ended.
+   * Further write to a committed block will throw an exception.
    */
   @Override
   public void close() {
+    // for MemoryStore blocks
+    if (blockStoreValue == DataStoreProperty.Value.MEMORY_STORE
+      || blockStoreValue == DataStoreProperty.Value.SERIALIZED_MEMORY_STORE) {
+      LOG.info("BOW close called, iterating over STV and writing to block");
+      if (memoryManager.acquireStorageMemory(blockToWrite.getId(), sizeTrackingVector.estimateSize())) {
+        sizeTrackingVector.forEach(element -> blockToWrite.write(partitioner.partition(element), element));
+//        for (Object element : sizeTrackingVector) {
+//          blockToWrite.write(partitioner.partition(element), element);
+//        }
+      } else { // block does not fit in memory
+        LOG.info("Not enough storage memory, remaing StoragePool Memory: {}, size of block is {}, blockID: {}",
+          memoryManager.getRemainingStorageMemory(), sizeTrackingVector.estimateSize(), blockToWrite.getId());
+//        blockStoreValue = DataStoreProperty.Value.LOCAL_FILE_STORE;
+//        create new file block
+//        String newBlockId = blockToWrite.getId();
+//        BlockStore previousBlockStore = blockManagerWorker.getBlockStore(DataStoreProperty.Value.LOCAL_FILE_STORE);
+//        LOG.info("try to delete old block");
+//        previousBlockStore.deleteBlock(blockToWrite.getId());
+//        BlockStore newBlockStore = blockManagerWorker.getBlockStore(DataStoreProperty.Value.LOCAL_FILE_STORE);
+//        Block newBlock = newBlockStore.createBlock(newBlockId);
+//        LOG.info("writing to new block, newblockid {}", newBlock.getId());
+//        sizeTrackingVector.forEach(element -> newBlock.write(partitioner.partition(element), element));
+//        blockToWrite = newBlock;
+//        runtimeEdge.changeDataStoreProperty(DataStoreProperty.Value.LOCAL_FILE_STORE);
+//        BlockStore fileBLockStore = blockManagerWorker.getBlockStore(DataStoreProperty.Value.LOCAL_FILE_STORE);
+        /// dongjoo: change id + spilled
+//        Block newBlock = blockManagerWorker.createBlock(blockToWrite.getId(),
+//          DataStoreProperty.Value.LOCAL_FILE_STORE);
+//        blockStoreValue = DataStoreProperty.Value.LOCAL_FILE_STORE;
+//        block
+//        sizeTrackingVector.forEach(element -> blockToWrite.write(partitioner.partition(element), element));
+        // populate newly created block
+        sizeTrackingVector.forEach(element ->
+          potentialSpilledBlocktoWrite.write(partitioner.partition(element), element));
+//        sizeTrackingVector.forEach(element -> newBlock.write(partitioner.partition(element), element));
+        blockManagerWorker.putSpilledBlock(blockToWrite, potentialSpilledBlocktoWrite);

Review comment:
       Let's discuss about this way of spilling also - I think spilling Block(create a bogus potentialBlock that isn't in PhysicalPlan, store data in it, and replace original Block with this potentialBlock) will definitely introduce complex consistency issues.

##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java
##########
@@ -100,12 +139,77 @@ public void writeWatermark(final Watermark watermark) {
   }
 
   /**
-   * Notifies that all writes for a block is end.
-   * Further write about a committed block will throw an exception.
+   * Notifies that all writes for a block have ended.
+   * Further write to a committed block will throw an exception.
    */
   @Override
   public void close() {
+    // for MemoryStore blocks
+    if (blockStoreValue == DataStoreProperty.Value.MEMORY_STORE
+      || blockStoreValue == DataStoreProperty.Value.SERIALIZED_MEMORY_STORE) {
+      LOG.info("BOW close called, iterating over STV and writing to block");
+      if (memoryManager.acquireStorageMemory(blockToWrite.getId(), sizeTrackingVector.estimateSize())) {
+        sizeTrackingVector.forEach(element -> blockToWrite.write(partitioner.partition(element), element));
+//        for (Object element : sizeTrackingVector) {
+//          blockToWrite.write(partitioner.partition(element), element);
+//        }
+      } else { // block does not fit in memory
+        LOG.info("Not enough storage memory, remaing StoragePool Memory: {}, size of block is {}, blockID: {}",
+          memoryManager.getRemainingStorageMemory(), sizeTrackingVector.estimateSize(), blockToWrite.getId());
+//        blockStoreValue = DataStoreProperty.Value.LOCAL_FILE_STORE;
+//        create new file block
+//        String newBlockId = blockToWrite.getId();
+//        BlockStore previousBlockStore = blockManagerWorker.getBlockStore(DataStoreProperty.Value.LOCAL_FILE_STORE);
+//        LOG.info("try to delete old block");
+//        previousBlockStore.deleteBlock(blockToWrite.getId());
+//        BlockStore newBlockStore = blockManagerWorker.getBlockStore(DataStoreProperty.Value.LOCAL_FILE_STORE);
+//        Block newBlock = newBlockStore.createBlock(newBlockId);
+//        LOG.info("writing to new block, newblockid {}", newBlock.getId());
+//        sizeTrackingVector.forEach(element -> newBlock.write(partitioner.partition(element), element));
+//        blockToWrite = newBlock;
+//        runtimeEdge.changeDataStoreProperty(DataStoreProperty.Value.LOCAL_FILE_STORE);
+//        BlockStore fileBLockStore = blockManagerWorker.getBlockStore(DataStoreProperty.Value.LOCAL_FILE_STORE);
+        /// dongjoo: change id + spilled
+//        Block newBlock = blockManagerWorker.createBlock(blockToWrite.getId(),
+//          DataStoreProperty.Value.LOCAL_FILE_STORE);
+//        blockStoreValue = DataStoreProperty.Value.LOCAL_FILE_STORE;
+//        block
+//        sizeTrackingVector.forEach(element -> blockToWrite.write(partitioner.partition(element), element));
+        // populate newly created block
+        sizeTrackingVector.forEach(element ->
+          potentialSpilledBlocktoWrite.write(partitioner.partition(element), element));
+//        sizeTrackingVector.forEach(element -> newBlock.write(partitioner.partition(element), element));
+        blockManagerWorker.putSpilledBlock(blockToWrite, potentialSpilledBlocktoWrite);
+//        runtimeEdge.changeDataStoreProperty(blockStoreValue);
+        // replace original block with new block
+//        blockToWrite = newBlock;
+          // create new block with runtime id manager?
+//        RuntimeIdManager.generateBlockId(runtimeEdge.getId(), srcTaskId), blockStoreValue);
+
+        // write to memblock (debug)
+//        sizeTrackingVector.forEach(element -> blockToWrite.write(partitioner.partition(element), element));
+        // commit the file block
+//        LOG.info("dongjoo BlockOutputWriter close NEW block, closing {} id {}", newBlock, newBlock.getId());
+        final DataPersistenceProperty.Value persistence = DataPersistenceProperty.Value.KEEP; // hardcode as keep?

Review comment:
       Ditto.

##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
##########
@@ -101,12 +105,16 @@ public TaskExecutor(final Task task,
                       final IntermediateDataIOFactory intermediateDataIOFactory,
                       final BroadcastManagerWorker broadcastManagerWorker,
                       final MetricMessageSender metricMessageSender,
+                      //dongjoo
+                      final MemoryManager memoryManager,

Review comment:
       If not needed, let's remove memoryManager from here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] codinggosu commented on a change in pull request #298: [NEMO-455] (WIP) MemoryManager to keep track of remaining storage, execution memory

Posted by GitBox <gi...@apache.org>.
codinggosu commented on a change in pull request #298:
URL: https://github.com/apache/incubator-nemo/pull/298#discussion_r456470441



##########
File path: dongjooals.sh
##########
@@ -0,0 +1,31 @@
+#!/usr/bin/env bash

Review comment:
       OK! I will remove all .sh files




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org