You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/10/16 01:12:19 UTC

svn commit: r1398568 - in /giraph/trunk: ./ giraph-formats-contrib/src/main/java/org/apache/giraph/io/accumulo/ giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/ giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/ giraph-...

Author: aching
Date: Mon Oct 15 23:12:19 2012
New Revision: 1398568

URL: http://svn.apache.org/viewvc?rev=1398568&view=rev
Log:
GIRAPH-371: Replace BspUtils in giraph-formats-contrib for speed. (aching)

Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/HBaseVertexInputFormat.java
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java
    giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java
    giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1398568&r1=1398567&r2=1398568&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Mon Oct 15 23:12:19 2012
@@ -1,6 +1,9 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-371: Replace BspUtils in giraph-formats-contrib for
+  speed. (aching)
+
   GIRAPH-369: bin/giraph broken (Nitay Joffe via ereisman)
 
   GIRAPH-368: HBase Vertex I/O formats handle setConf() internally (bfem via ereisman)

Modified: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java?rev=1398568&r1=1398567&r2=1398568&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java Mon Oct 15 23:12:19 2012
@@ -20,6 +20,7 @@ package org.apache.giraph.io.accumulo;
 import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.VertexInputFormat;
 import org.apache.giraph.graph.VertexReader;
 import org.apache.hadoop.io.Writable;
@@ -71,9 +72,11 @@ public abstract class AccumuloVertexInpu
       V extends Writable, E extends Writable, M extends Writable>
       implements VertexReader<I, V, E, M> {
 
+    /** Giraph configuration */
+    private ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
     /**
-    * Used by subclasses to read key/value pairs.
-    */
+     * Used by subclasses to read key/value pairs.
+     */
     private final RecordReader<Key, Value> reader;
     /** Context passed to initialize */
     private TaskAttemptContext context;
@@ -86,20 +89,25 @@ public abstract class AccumuloVertexInpu
       this.reader = reader;
     }
 
-      /**
-       * initialize the reader.
-       *
-       * @param inputSplit Input split to be used for reading vertices.
-       * @param context Context from the task.
-       * @throws IOException
-       * @throws InterruptedException
-       */
+    public ImmutableClassesGiraphConfiguration<I, V, E, M> getConfiguration() {
+      return configuration;
+    }
 
+    /**
+     * initialize the reader.
+     *
+     * @param inputSplit Input split to be used for reading vertices.
+     * @param context Context from the task.
+     * @throws IOException
+     * @throws InterruptedException
+     */
     public void initialize(InputSplit inputSplit,
                            TaskAttemptContext context)
       throws IOException, InterruptedException {
       reader.initialize(inputSplit, context);
       this.context = context;
+      this.configuration = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
+          context.getConfiguration());
     }
 
     /**

Modified: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/HBaseVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/HBaseVertexInputFormat.java?rev=1398568&r1=1398567&r2=1398568&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/HBaseVertexInputFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/HBaseVertexInputFormat.java Mon Oct 15 23:12:19 2012
@@ -17,6 +17,7 @@
  */
 package org.apache.giraph.io.hbase;
 
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.VertexInputFormat;
 import org.apache.giraph.graph.VertexReader;
 import org.apache.hadoop.hbase.client.Result;
@@ -92,16 +93,16 @@ public abstract  class HBaseVertexInputF
           V extends Writable,
           E extends Writable, M extends Writable>
           implements VertexReader<I, V, E, M> {
-
-    /**
-     * reader instance
-     */
+    /** Giraph configuration */
+    private ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+    /** Reader instance */
     private final RecordReader<ImmutableBytesWritable, Result> reader;
     /** Context passed to initialize */
     private TaskAttemptContext context;
 
     /**
-     *  sets the base TableInputFOrmat and creates a record reader.
+     * Sets the base TableInputFormat and creates a record reader.
+     *
      * @param split InputSplit
      * @param context Context
      * @throws IOException
@@ -112,6 +113,10 @@ public abstract  class HBaseVertexInputF
       this.reader = BASE_FORMAT.createRecordReader(split, context);
     }
 
+    public ImmutableClassesGiraphConfiguration<I, V, E, M> getConfiguration() {
+      return configuration;
+    }
+
     /**
      * initialize
      *
@@ -126,6 +131,8 @@ public abstract  class HBaseVertexInputF
       InterruptedException {
       reader.initialize(inputSplit, context);
       this.context = context;
+      this.configuration = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
+          context.getConfiguration());
     }
 
     /**

Modified: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java?rev=1398568&r1=1398567&r2=1398568&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java Mon Oct 15 23:12:19 2012
@@ -22,7 +22,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexInputFormat;
 import org.apache.giraph.graph.VertexReader;
@@ -59,69 +59,74 @@ import org.apache.log4j.Logger;
 
 @SuppressWarnings("rawtypes")
 public abstract class HCatalogVertexInputFormat<
-        I extends WritableComparable,
-        V extends Writable,
-        E extends Writable,
-        M extends Writable>
-        extends VertexInputFormat<I, V, E, M> {
+    I extends WritableComparable,
+    V extends Writable,
+    E extends Writable,
+    M extends Writable>
+    extends VertexInputFormat<I, V, E, M> {
   /**
-  * H catalog input format.
-  */
+   * H catalog input format.
+   */
   private HCatInputFormat hCatInputFormat = new HCatInputFormat();
 
   @Override
   public final List<InputSplit> getSplits(
-            final JobContext context, final int numWorkers)
+      final JobContext context, final int numWorkers)
     throws IOException, InterruptedException {
     return hCatInputFormat.getSplits(context);
   }
 
- /**
-  * Abstract class that users should subclass
-  * based on their specific vertex
-  * input. HCatRecord can be parsed to get the
-  * required data for implementing
-  * getCurrentVertex(). If the vertex spans more
-  * than one HCatRecord,
-  * nextVertex() should be overwritten to handle that logic as well.
-  */
+  /**
+   * Abstract class that users should subclass
+   * based on their specific vertex
+   * input. HCatRecord can be parsed to get the
+   * required data for implementing
+   * getCurrentVertex(). If the vertex spans more
+   * than one HCatRecord,
+   * nextVertex() should be overwritten to handle that logic as well.
+   */
   protected abstract class HCatalogVertexReader implements
-        VertexReader<I, V, E, M> {
-
+      VertexReader<I, V, E, M> {
+    /** Giraph configuration */
+    private ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
     /** Internal HCatRecordReader. */
     private RecordReader<WritableComparable,
-                HCatRecord> hCatRecordReader;
-
+        HCatRecord> hCatRecordReader;
     /** Context passed to initialize. */
     private TaskAttemptContext context;
 
+    public ImmutableClassesGiraphConfiguration<I, V, E, M> getConfiguration() {
+      return configuration;
+    }
+
     /**
      * Initialize with the HCatRecordReader.
      *
      * @param recordReader internal reader
      */
     private void initialize(
-       final RecordReader<
-                   WritableComparable, HCatRecord>
-                   recordReader) {
+        final RecordReader<
+            WritableComparable, HCatRecord>
+            recordReader) {
       this.hCatRecordReader = recordReader;
     }
 
     @Override
     public final void initialize(
-           final InputSplit inputSplit,
-           final TaskAttemptContext ctxt)
+        final InputSplit inputSplit,
+        final TaskAttemptContext ctxt)
       throws IOException, InterruptedException {
       hCatRecordReader.initialize(inputSplit, ctxt);
       this.context = ctxt;
+      this.configuration = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
+          context.getConfiguration());
     }
 
     @Override
-    public boolean nextVertex()
-      throws IOException, InterruptedException {
-       // Users can override this if desired,
-       // and a vertex is bigger than
-       // a single row.
+    public boolean nextVertex() throws IOException, InterruptedException {
+      // Users can override this if desired,
+      // and a vertex is bigger than
+      // a single row.
       return hCatRecordReader.nextKeyValue();
     }
 
@@ -131,222 +136,217 @@ public abstract class HCatalogVertexInpu
     }
 
     @Override
-    public final float getProgress()
-      throws IOException, InterruptedException {
+    public final float getProgress() throws IOException, InterruptedException {
       return hCatRecordReader.getProgress();
     }
 
     /**
-    * Get the record reader.
-    * @return Record reader to be used for reading.
-    */
+     * Get the record reader.
+     * @return Record reader to be used for reading.
+     */
     protected final RecordReader<WritableComparable, HCatRecord>
     getRecordReader() {
       return hCatRecordReader;
     }
 
-   /**
-    * Get the context.
-    *
-    *
-    *
-    * @return Context passed to initialize.
-    */
+    /**
+     * Get the context.
+     *
+     *
+     *
+     * @return Context passed to initialize.
+     */
     protected final TaskAttemptContext getContext() {
       return context;
     }
   }
 
   /**
-  * create vertex writer instance.
-  * @return HCatalogVertexReader
-  */
+   * create vertex writer instance.
+   * @return HCatalogVertexReader
+   */
   protected abstract HCatalogVertexReader createVertexReader();
 
   @Override
   public final VertexReader<I, V, E, M>
   createVertexReader(final InputSplit split,
-    final TaskAttemptContext context)
+                     final TaskAttemptContext context)
     throws IOException {
     try {
       HCatalogVertexReader reader = createVertexReader();
       reader.initialize(hCatInputFormat.
-              createRecordReader(split, context));
+          createRecordReader(split, context));
       return reader;
     } catch (InterruptedException e) {
       throw new IllegalStateException(
-               "createVertexReader: " +
-                      "Interrupted creating reader.", e);
+          "createVertexReader: " +
+              "Interrupted creating reader.", e);
     }
   }
 
- /**
-  * HCatalogVertexReader for tables holding
-  * complete vertex info within each
-  * row.
-  */
+  /**
+   * HCatalogVertexReader for tables holding
+   * complete vertex info within each
+   * row.
+   */
   protected abstract class SingleRowHCatalogVertexReader
-        extends HCatalogVertexReader {
+      extends HCatalogVertexReader {
 
     /**
-    * 1024 const.
-    */
+     * 1024 const.
+     */
     private static final int BYTE_CONST = 1024;
 
     /**
-    *  logger
-    */
+     *  logger
+     */
     private final Logger log =
-            Logger.getLogger(SingleRowHCatalogVertexReader.class);
+        Logger.getLogger(SingleRowHCatalogVertexReader.class);
     /**
-    * record count.
-    */
+     * record count.
+     */
     private int recordCount = 0;
     /**
-    * modulus check counter.
-    */
+     * modulus check counter.
+     */
     private final int recordModLimit = 1000;
 
 
     /**
-    * get vertex id.
-    * @param record hcat record
-    * @return I id
-         */
+     * get vertex id.
+     * @param record hcat record
+     * @return I id
+     */
     protected abstract I getVertexId(HCatRecord record);
 
     /**
-    * get vertext value.
-    * @param record hcat record
-    * @return V value
-    */
+     * get vertex value.
+     * @param record hcat record
+     * @return V value
+     */
     protected abstract V getVertexValue(HCatRecord record);
 
     /**
-    * get edges.
-    * @param record hcat record
-    * @return Map edges
-    */
+     * get edges.
+     * @param record hcat record
+     * @return Map edges
+     */
     protected abstract Map<I, E> getEdges(HCatRecord record);
 
     @Override
     public final Vertex<I, V, E, M> getCurrentVertex()
       throws IOException, InterruptedException {
       HCatRecord record = getRecordReader().getCurrentValue();
-      Vertex<I, V, E, M> vertex =
-                    BspUtils.createVertex(getContext()
-                            .getConfiguration());
+      Vertex<I, V, E, M> vertex = getConfiguration().createVertex();
       vertex.initialize(getVertexId(record), getVertexValue(record),
-                    getEdges(record));
+          getEdges(record));
       ++recordCount;
       if ((recordCount % recordModLimit) == 0) {
         log.info("read " + recordCount + " records");
-                // memory usage
+        // memory usage
         Runtime runtime = Runtime.getRuntime();
         double gb = BYTE_CONST *
-                    BYTE_CONST *
-                    BYTE_CONST;
+            BYTE_CONST *
+            BYTE_CONST;
         log.info("Memory: " + (runtime.totalMemory() / gb) +
-                "GB total = " +
-                ((runtime.totalMemory() - runtime.freeMemory()) / gb) +
-                "GB used + " + (runtime.freeMemory() / gb) +
-                "GB free, " + (runtime.maxMemory() / gb) + "GB max");
+            "GB total = " +
+            ((runtime.totalMemory() - runtime.freeMemory()) / gb) +
+            "GB used + " + (runtime.freeMemory() / gb) +
+            "GB free, " + (runtime.maxMemory() / gb) + "GB max");
       }
       return vertex;
     }
   }
   /**
-  * HCatalogVertexReader for tables
-  * holding vertex info across multiple rows
-  * sorted by vertex id column,
-  * so that they appear consecutively to the
-  * RecordReader.
-  */
+   * HCatalogVertexReader for tables
+   * holding vertex info across multiple rows
+   * sorted by vertex id column,
+   * so that they appear consecutively to the
+   * RecordReader.
+   */
   protected abstract class MultiRowHCatalogVertexReader extends
-            HCatalogVertexReader {
+      HCatalogVertexReader {
     /**
-    * modulus check counter.
-    */
+     * modulus check counter.
+     */
     private static final int RECORD_MOD_LIMIT = 1000;
 
     /**
-    *  logger
-    */
+     *  logger
+     */
     private final Logger log =
-            Logger.getLogger(MultiRowHCatalogVertexReader.class);
+        Logger.getLogger(MultiRowHCatalogVertexReader.class);
     /**
-    * current vertex id.
-    */
+     * current vertex id.
+     */
     private I currentVertexId = null;
     /**
-    * destination edge map.
-    */
+     * destination edge map.
+     */
     private Map<I, E> destEdgeMap = Maps.newHashMap();
     /**
-    * record for vertex.
-    */
+     * record for vertex.
+     */
     private List<HCatRecord> recordsForVertex = Lists.newArrayList();
     /**
-    * record count.
-    */
+     * record count.
+     */
     private int recordCount = 0;
     /**
-    * vertex.
-    *
-    */
+     * vertex.
+     *
+     */
     private Vertex<I, V, E, M> vertex = null;
     /**
-    * get vertex id from record.
-    *
-    * @param record hcat
-    * @return I vertex id
-    */
+     * get vertex id from record.
+     *
+     * @param record hcat
+     * @return I vertex id
+     */
     protected abstract I getVertexId(HCatRecord record);
 
     /**
-    * get vertex value from record.
-    * @param records all vertex values
-    * @return V iterable of record values
-    */
+     * get vertex value from record.
+     * @param records all vertex values
+     * @return V iterable of record values
+     */
     protected abstract V getVertexValue(
-                Iterable<HCatRecord> records);
+        Iterable<HCatRecord> records);
 
     /**
-    * get target vertex id from record.
-    *
-    * @param record hcat
-    * @return I vertex id of target.
-    */
+     * get target vertex id from record.
+     *
+     * @param record hcat
+     * @return I vertex id of target.
+     */
     protected abstract I getTargetVertexId(HCatRecord record);
 
     /**
-    * get edge value from record.
-    *
-    * @param record hcat.
-    * @return E edge value.
-    */
+     * get edge value from record.
+     *
+     * @param record hcat.
+     * @return E edge value.
+     */
     protected abstract E getEdgeValue(HCatRecord record);
 
     @Override
     public final Vertex<I, V, E, M>
-    getCurrentVertex()
-      throws IOException, InterruptedException {
+    getCurrentVertex() throws IOException, InterruptedException {
       return vertex;
     }
 
     @Override
-    public boolean nextVertex()
-      throws IOException, InterruptedException {
+    public boolean nextVertex() throws IOException, InterruptedException {
       while (getRecordReader().nextKeyValue()) {
         HCatRecord record =
-                getRecordReader().getCurrentValue();
+            getRecordReader().getCurrentValue();
         if (currentVertexId == null) {
           currentVertexId = getVertexId(record);
         }
         if (currentVertexId.equals(getVertexId(record))) {
           destEdgeMap.put(
-            getTargetVertexId(record),
-            getEdgeValue(record));
+              getTargetVertexId(record),
+              getEdgeValue(record));
           recordsForVertex.add(record);
         } else {
           createCurrentVertex();
@@ -368,11 +368,10 @@ public abstract class HCatalogVertexInpu
     }
 
     /**
-    * create current vertex.
-    */
+     * create current vertex.
+     */
     private void createCurrentVertex() {
-      vertex = BspUtils.
-              createVertex(getContext().getConfiguration());
+      vertex = getConfiguration().createVertex();
       vertex.initialize(currentVertexId, getVertexValue(recordsForVertex),
           destEdgeMap);
       destEdgeMap.clear();

Modified: giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java?rev=1398568&r1=1398567&r2=1398568&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java Mon Oct 15 23:12:19 2012
@@ -22,7 +22,6 @@ import org.apache.accumulo.core.data.Key
 import org.apache.accumulo.core.data.Value;
 import org.apache.giraph.io.accumulo.AccumuloVertexInputFormat;
 import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.BspUtils;
 import org.apache.giraph.graph.VertexReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
@@ -80,8 +79,7 @@ public class AccumuloEdgeInputFormat
               Key key = getRecordReader().getCurrentKey();
               Value value = getRecordReader().getCurrentValue();
               Vertex<Text, Text, Text, Text> vertex =
-                  BspUtils.<Text, Text, Text, Text>createVertex(
-                      getContext().getConfiguration());
+                  getConfiguration().createVertex();
               Text vertexId = key.getRow();
               Map<Text, Text> edges = Maps.newHashMap();
               String edge = new String(value.get());

Modified: giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java?rev=1398568&r1=1398567&r2=1398568&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java Mon Oct 15 23:12:19 2012
@@ -20,15 +20,11 @@ package org.apache.giraph.io.hbase.edgem
 import com.google.common.collect.Maps;
 import org.apache.giraph.io.hbase.HBaseVertexInputFormat;
 import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.BspUtils;
 import org.apache.giraph.graph.VertexReader;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.log4j.Logger;
 
@@ -45,7 +41,6 @@ public class TableEdgeInputFormat extend
     private static final Logger log =
             Logger.getLogger(TableEdgeInputFormat.class);
     private static final Text uselessEdgeValue = new Text();
-    private Configuration conf;
 
     public VertexReader<Text, Text, Text, Text>
             createVertexReader(InputSplit split,
@@ -83,8 +78,7 @@ public class TableEdgeInputFormat extend
                 throws IOException, InterruptedException {
             Result row = getRecordReader().getCurrentValue();
             Vertex<Text, Text, Text, Text> vertex =
-                    BspUtils.<Text, Text, Text, Text>
-                            createVertex(getContext().getConfiguration());
+                getConfiguration().createVertex();
             Text vertexId = new Text(Bytes.toString(row.getRow()));
             Map<Text, Text> edges = Maps.newHashMap();
             String edge = Bytes.toString(row.getValue(CF, CHILDREN));