You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2016/07/11 17:29:27 UTC

git commit: updated refs/heads/trunk to b51ecd2

Repository: giraph
Updated Branches:
  refs/heads/trunk 28cbe037c -> b51ecd27c


GIRAPH-1085: Add InMemoryDataAccessor

Summary: When we deal with graphs which have a lot of vertices with very little total data associated with them (values + edges) we start experiencing memory problems because of too many objects created, since every vertex has multiple objects associated with it. To solve this problem, we should have a serialized partition representation (current ByteArrayPartition just keeps byte[] per vertex, not per partition). We can leverage the out-of-core infrastructure and just add data accessor which won't be backed by disk but in memory buffers.

Test Plan: Successfully ran a job which was failing without this.

Differential Revision: https://reviews.facebook.net/D60435


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/b51ecd27
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/b51ecd27
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/b51ecd27

Branch: refs/heads/trunk
Commit: b51ecd27cccc520764c9ae53cabcb61d67d46d15
Parents: 28cbe03
Author: Maja Kabiljo <ma...@fb.com>
Authored: Wed Jul 6 14:57:33 2016 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Mon Jul 11 10:28:14 2016 -0700

----------------------------------------------------------------------
 .../giraph/ooc/data/DiskBackedDataStore.java    |   3 +-
 .../ooc/persistence/InMemoryDataAccessor.java   | 158 +++++++++++++++++++
 .../ooc/persistence/OutOfCoreDataAccessor.java  |   3 +-
 .../apache/giraph/utils/io/BigDataOutput.java   |  15 ++
 4 files changed, 177 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/b51ecd27/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java
index e9ab167..c8da9a0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java
@@ -241,8 +241,9 @@ public abstract class DiskBackedDataStore<T> {
         index.addIndex(DataIndex.TypeIndexEntry.BUFFER);
         OutOfCoreDataAccessor.DataInputWrapper inputWrapper =
             oocEngine.getDataAccessor().prepareInput(ioThreadId, index.copy());
+        DataInput dataInput = inputWrapper.getDataInput();
         for (int i = 0; i < numBuffers; ++i) {
-          T entry = readNextEntry(inputWrapper.getDataInput());
+          T entry = readNextEntry(dataInput);
           addEntryToInMemoryPartitionData(partitionId, entry);
         }
         numBytes += inputWrapper.finalizeInput(true);

http://git-wip-us.apache.org/repos/asf/giraph/blob/b51ecd27/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/InMemoryDataAccessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/InMemoryDataAccessor.java b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/InMemoryDataAccessor.java
new file mode 100644
index 0000000..4eca0f1
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/InMemoryDataAccessor.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.persistence;
+
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.io.BigDataInput;
+import org.apache.giraph.utils.io.BigDataOutput;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Implementation of data accessor which keeps all the data serialized but in
+ * memory. Useful to keep the number of used objects under control.
+ *
+ * TODO currently doesn't reuse any of the byte arrays so could cause more GCs
+ */
+public class InMemoryDataAccessor implements OutOfCoreDataAccessor {
+  /** Configuration */
+  private final ImmutableClassesGiraphConfiguration<?, ?, ?> conf;
+  /** DataInputOutput for each DataIndex used */
+  private final ConcurrentHashMap<DataIndex, BigDataOutput> data;
+
+  /**
+   * Constructor
+   *
+   * @param conf Configuration
+   */
+  public InMemoryDataAccessor(
+      ImmutableClassesGiraphConfiguration<?, ?, ?> conf) {
+    this.conf = conf;
+    data = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void initialize() {
+    // No-op
+  }
+
+  @Override
+  public void shutdown() {
+    // No-op
+  }
+
+  @Override
+  public int getNumAccessorThreads() {
+    return GiraphConstants.NUM_OUT_OF_CORE_THREADS.get(conf);
+  }
+
+  @Override
+  public DataInputWrapper prepareInput(int threadId,
+      DataIndex index) throws IOException {
+    return new InMemoryDataInputWrapper(
+        new BigDataInput(data.get(index)), index);
+  }
+
+  @Override
+  public DataOutputWrapper prepareOutput(int threadId,
+      DataIndex index, boolean shouldAppend) throws IOException {
+    // Don't need to worry about synchronization here since only one thread
+    // can deal with one index
+    BigDataOutput output = data.get(index);
+    if (output == null || !shouldAppend) {
+      output = new BigDataOutput(conf);
+      data.put(index, output);
+    }
+    return new InMemoryDataOutputWrapper(output);
+  }
+
+  @Override
+  public boolean dataExist(int threadId, DataIndex index) {
+    return data.containsKey(index);
+  }
+
+  /**
+   * {@link DataOutputWrapper} implementation for {@link InMemoryDataAccessor}
+   */
+  public class InMemoryDataOutputWrapper implements DataOutputWrapper {
+    /** Output to write data to */
+    private final BigDataOutput output;
+    /** Size of output at the moment it was created */
+    private final long initialSize;
+
+    /**
+     * Constructor
+     *
+     * @param output Output to write data to
+     */
+    public InMemoryDataOutputWrapper(BigDataOutput output) {
+      this.output = output;
+      initialSize = output.getSize();
+    }
+
+    @Override
+    public DataOutput getDataOutput() {
+      return output;
+    }
+
+    @Override
+    public long finalizeOutput() {
+      return output.getSize() - initialSize;
+    }
+  }
+
+  /**
+   * {@link DataInputWrapper} implementation for {@link InMemoryDataAccessor}
+   */
+  public class InMemoryDataInputWrapper implements DataInputWrapper {
+    /** Input to read data from */
+    private final BigDataInput input;
+    /** DataIndex which this wrapper belongs to */
+    private final DataIndex index;
+
+    /**
+     * Constructor
+     *
+     * @param input Input to read data from
+     * @param index DataIndex which this wrapper belongs to
+     */
+    public InMemoryDataInputWrapper(
+        BigDataInput input, DataIndex index) {
+      this.input = input;
+      this.index = index;
+    }
+
+    @Override
+    public DataInput getDataInput() {
+      return input;
+    }
+
+    @Override
+    public long finalizeInput(boolean deleteOnClose) {
+      if (deleteOnClose) {
+        data.remove(index);
+      }
+      return input.getPos();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/b51ecd27/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/OutOfCoreDataAccessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/OutOfCoreDataAccessor.java b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/OutOfCoreDataAccessor.java
index d4ddc62..cecb0f3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/OutOfCoreDataAccessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/OutOfCoreDataAccessor.java
@@ -82,7 +82,8 @@ public interface OutOfCoreDataAccessor {
   /** Interface to wrap <code>DataInput</code> */
   interface DataInputWrapper {
     /**
-     * @return the <code>DataInput</code>
+     * @return the <code>DataInput</code>, should return the same instance
+     * every time it's called (not start from the beginning)
      */
     DataInput getDataInput();
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/b51ecd27/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java b/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java
index c0fff60..9e84ebc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java
@@ -125,6 +125,21 @@ public class BigDataOutput implements DataOutput, Writable {
     return conf;
   }
 
+  /**
+   * Get number of bytes written to this data output
+   *
+   * @return Size in bytes
+   */
+  public long getSize() {
+    long size = currentDataOutput.getPos();
+    if (dataOutputs != null) {
+      for (ExtendedDataOutput dataOutput : dataOutputs) {
+        size += dataOutput.getPos();
+      }
+    }
+    return size;
+  }
+
   @Override
   public void write(int b) throws IOException {
     getDataOutputToWriteTo().write(b);