You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/05/21 22:19:06 UTC

incubator-tinkerpop git commit: Giraph and Spark now both use the same Rule engine for Aggregating memory -- INCR, AND, OR, SET, etc.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 65de05dd4 -> 51919f4a9


Giraph and Spark now both use the same Rule engine for Aggregating memory -- INCR, AND, OR, SET, etc.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/51919f4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/51919f4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/51919f4a

Branch: refs/heads/master
Commit: 51919f4a9b3a497048298b672e8a9239515a0af2
Parents: 65de05d
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu May 21 14:19:08 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu May 21 14:19:18 2015 -0600

----------------------------------------------------------------------
 .../computer/giraph/GiraphComputeVertex.java    |   3 +-
 .../process/computer/giraph/GiraphMemory.java   |  31 +++---
 .../computer/giraph/MemoryAggregator.java       |  65 +++++------
 .../process/computer/giraph/RuleWritable.java   | 107 -------------------
 .../process/computer/spark/RuleAccumulator.java |  12 +--
 .../process/computer/spark/SparkMemory.java     |   8 +-
 .../hadoop/process/computer/util/Rule.java      |  44 +++++++-
 .../hadoop/structure/io/HadoopPools.java        |   7 +-
 8 files changed, 104 insertions(+), 173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/51919f4a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
index eb07274..1507def 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph;
 import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.Rule;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
@@ -50,7 +51,7 @@ public final class GiraphComputeVertex extends Vertex<ObjectWritable, VertexWrit
         final GiraphMemory memory = workerContext.getMemory();
         final GiraphMessenger messenger = workerContext.getMessenger(this, messages.iterator());
         ///////////
-        if (!(Boolean) ((RuleWritable) this.getAggregatedValue(Constants.GREMLIN_HADOOP_HALT)).getObject()) {
+        if (!(Boolean) ((Rule) this.getAggregatedValue(Constants.GREMLIN_HADOOP_HALT)).getObject()) {
             vertexProgram.execute(ComputerGraph.vertexProgram(this.getValue().get(), vertexProgram), messenger, memory);
         } else if (workerContext.deriveMemory()) {
             final MapMemory mapMemory = new MapMemory();

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/51919f4a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMemory.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMemory.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMemory.java
index 306bf6f..78e0e45 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMemory.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMemory.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph;
 import org.apache.commons.configuration.Configuration;
 import org.apache.giraph.master.MasterCompute;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.Rule;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
@@ -77,7 +78,7 @@ public final class GiraphMemory extends MasterCompute implements Memory {
                 }
                 this.registerPersistentAggregator(Constants.GREMLIN_HADOOP_HALT, MemoryAggregator.class);
                 this.registerPersistentAggregator(Constants.HIDDEN_RUNTIME, MemoryAggregator.class);
-                this.setAggregatedValue(Constants.GREMLIN_HADOOP_HALT, new RuleWritable(RuleWritable.Rule.SET, false));
+                this.setAggregatedValue(Constants.GREMLIN_HADOOP_HALT, new Rule(Rule.Operation.SET, Boolean.FALSE));
                 this.set(Constants.HIDDEN_RUNTIME, System.currentTimeMillis());
             } catch (final Exception e) {
                 throw new IllegalStateException(e.getMessage(), e);
@@ -90,7 +91,7 @@ public final class GiraphMemory extends MasterCompute implements Memory {
                 if (!this.getConf().getBoolean(Constants.GREMLIN_HADOOP_DERIVE_MEMORY, false)) // no need for the extra BSP round if memory is not required
                     this.haltComputation();
                 else
-                    this.setAggregatedValue(Constants.GREMLIN_HADOOP_HALT, new RuleWritable(RuleWritable.Rule.SET, true));
+                    this.setAggregatedValue(Constants.GREMLIN_HADOOP_HALT, new Rule(Rule.Operation.SET, Boolean.TRUE));
             }
         }
     }
@@ -117,14 +118,14 @@ public final class GiraphMemory extends MasterCompute implements Memory {
 
     @Override
     public boolean exists(final String key) {
-        final RuleWritable rule = this.isMasterCompute ? this.getAggregatedValue(key) : this.worker.getAggregatedValue(key);
+        final Rule rule = this.isMasterCompute ? this.getAggregatedValue(key) : this.worker.getAggregatedValue(key);
         return null != rule.getObject();
     }
 
     @Override
     public <R> R get(final String key) throws IllegalArgumentException {
         //this.checkKey(key);
-        final RuleWritable rule = this.isMasterCompute ? this.getAggregatedValue(key) : this.worker.getAggregatedValue(key);
+        final Rule rule = this.isMasterCompute ? this.getAggregatedValue(key) : this.worker.getAggregatedValue(key);
         if (null == rule.getObject())
             throw Memory.Exceptions.memoryDoesNotExist(key);
         else
@@ -135,20 +136,20 @@ public final class GiraphMemory extends MasterCompute implements Memory {
     public void set(final String key, Object value) {
         this.checkKeyValue(key, value);
         if (this.isMasterCompute)
-            this.setAggregatedValue(key, new RuleWritable(RuleWritable.Rule.SET, value));
+            this.setAggregatedValue(key, new Rule(Rule.Operation.SET, value));
         else
-            this.worker.aggregate(key, new RuleWritable(RuleWritable.Rule.SET, value));
+            this.worker.aggregate(key, new Rule(Rule.Operation.SET, value));
     }
 
     @Override
     public void and(final String key, final boolean bool) {
         this.checkKeyValue(key, bool);
         if (this.isMasterCompute) {  // only called on setup() and terminate()
-            Boolean value = this.<RuleWritable>getAggregatedValue(key).<Boolean>getObject();
+            Boolean value = this.<Rule>getAggregatedValue(key).<Boolean>getObject();
             value = null == value ? bool : bool && value;
-            this.setAggregatedValue(key, new RuleWritable(RuleWritable.Rule.AND, value));
+            this.setAggregatedValue(key, new Rule(Rule.Operation.AND, value));
         } else {
-            this.worker.aggregate(key, new RuleWritable(RuleWritable.Rule.AND, bool));
+            this.worker.aggregate(key, new Rule(Rule.Operation.AND, bool));
         }
     }
 
@@ -156,11 +157,11 @@ public final class GiraphMemory extends MasterCompute implements Memory {
     public void or(final String key, final boolean bool) {
         this.checkKeyValue(key, bool);
         if (this.isMasterCompute) {   // only called on setup() and terminate()
-            Boolean value = this.<RuleWritable>getAggregatedValue(key).<Boolean>getObject();
+            Boolean value = this.<Rule>getAggregatedValue(key).<Boolean>getObject();
             value = null == value ? bool : bool || value;
-            this.setAggregatedValue(key, new RuleWritable(RuleWritable.Rule.OR, value));
+            this.setAggregatedValue(key, new Rule(Rule.Operation.OR, value));
         } else {
-            this.worker.aggregate(key, new RuleWritable(RuleWritable.Rule.OR, bool));
+            this.worker.aggregate(key, new Rule(Rule.Operation.OR, bool));
         }
     }
 
@@ -168,11 +169,11 @@ public final class GiraphMemory extends MasterCompute implements Memory {
     public void incr(final String key, final long delta) {
         this.checkKeyValue(key, delta);
         if (this.isMasterCompute) {   // only called on setup() and terminate()
-            Number value = this.<RuleWritable>getAggregatedValue(key).<Number>getObject();
+            Number value = this.<Rule>getAggregatedValue(key).<Number>getObject();
             value = null == value ? delta : value.longValue() + delta;
-            this.setAggregatedValue(key, new RuleWritable(RuleWritable.Rule.INCR, value));
+            this.setAggregatedValue(key, new Rule(Rule.Operation.INCR, value));
         } else {
-            this.worker.aggregate(key, new RuleWritable(RuleWritable.Rule.INCR, delta));
+            this.worker.aggregate(key, new Rule(Rule.Operation.INCR, delta));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/51919f4a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/MemoryAggregator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/MemoryAggregator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/MemoryAggregator.java
index 1ba0f4e..8f11fb3 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/MemoryAggregator.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/MemoryAggregator.java
@@ -19,68 +19,69 @@
 package org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph;
 
 import org.apache.giraph.aggregators.Aggregator;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.Rule;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class MemoryAggregator implements Aggregator<RuleWritable> {
+public final class MemoryAggregator implements Aggregator<Rule> {
 
-    private Object value;
-    private RuleWritable.Rule lastRule = null;
+    private Object currentObject;
+    private Rule.Operation lastOperation = null;
 
     public MemoryAggregator() {
-        this.value = null;
+        this.currentObject = null;
     }
 
     @Override
-    public RuleWritable getAggregatedValue() {
-        if (null == this.value)
+    public Rule getAggregatedValue() {
+        if (null == this.currentObject)
             return createInitialValue();
-        else if (this.value instanceof Long)
-            return new RuleWritable(RuleWritable.Rule.INCR, this.value);
+        else if (this.currentObject instanceof Long)
+            return new Rule(Rule.Operation.INCR, this.currentObject);
         else
-            return new RuleWritable(null == this.lastRule ? RuleWritable.Rule.NO_OP : this.lastRule, this.value);
+            return new Rule(null == this.lastOperation ? Rule.Operation.NO_OP : this.lastOperation, this.currentObject);
     }
 
     @Override
-    public void setAggregatedValue(final RuleWritable rule) {
-        this.value = rule.getObject();
+    public void setAggregatedValue(final Rule rule) {
+        this.currentObject = rule.getObject();
     }
 
     @Override
     public void reset() {
-        this.value = null;
+        this.currentObject = null;
     }
 
     @Override
-    public RuleWritable createInitialValue() {
-        return new RuleWritable(RuleWritable.Rule.NO_OP, null);
+    public Rule createInitialValue() {
+        return new Rule(Rule.Operation.NO_OP, null);
     }
 
     @Override
-    public void aggregate(RuleWritable ruleWritable) {
-        final RuleWritable.Rule rule = ruleWritable.getRule();
+    public void aggregate(final Rule ruleWritable) {
+        final Rule.Operation rule = ruleWritable.getOperation();
         final Object object = ruleWritable.getObject();
-        if (rule != RuleWritable.Rule.NO_OP)
-            this.lastRule = rule;
+        if (rule != Rule.Operation.NO_OP)
+            this.lastOperation = rule;
 
-        if (null == this.value || rule.equals(RuleWritable.Rule.SET)) {
-            this.value = object;
+        if (null == this.currentObject || rule.equals(Rule.Operation.SET)) {
+            this.currentObject = object;
         } else {
-            if (rule.equals(RuleWritable.Rule.INCR)) {
-                this.value = (Long) this.value + (Long) object;
-            } else if (rule.equals(RuleWritable.Rule.AND)) {
-                this.value = (Boolean) this.value && (Boolean) object;
-            } else if (rule.equals(RuleWritable.Rule.OR)) {
-                this.value = (Boolean) this.value || (Boolean) object;
-            } else if (rule.equals(RuleWritable.Rule.NO_OP)) {
+            if (rule.equals(Rule.Operation.INCR)) {
+                this.currentObject = (Long) this.currentObject + (Long) object;
+            } else if (rule.equals(Rule.Operation.AND)) {
+                this.currentObject = (Boolean) this.currentObject && (Boolean) object;
+            } else if (rule.equals(Rule.Operation.OR)) {
+                this.currentObject = (Boolean) this.currentObject || (Boolean) object;
+            } else if (rule.equals(Rule.Operation.NO_OP)) {
                 if (object instanceof Boolean) { // only happens when NO_OP booleans are being propagated will this occur
-                    if (null == this.lastRule) {
+                    if (null == this.lastOperation) {
                         // do nothing ... why?
-                    } else if (this.lastRule.equals(RuleWritable.Rule.AND)) {
-                        this.value = (Boolean) this.value && (Boolean) object;
-                    } else if (this.lastRule.equals(RuleWritable.Rule.OR)) {
-                        this.value = (Boolean) this.value || (Boolean) object;
+                    } else if (this.lastOperation.equals(Rule.Operation.AND)) {
+                        this.currentObject = (Boolean) this.currentObject && (Boolean) object;
+                    } else if (this.lastOperation.equals(Rule.Operation.OR)) {
+                        this.currentObject = (Boolean) this.currentObject || (Boolean) object;
                     } else {
                         throw new IllegalStateException("This state should not have occurred: " + ruleWritable);
                     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/51919f4a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/RuleWritable.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/RuleWritable.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/RuleWritable.java
deleted file mode 100644
index ed309e7..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/RuleWritable.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph;
-
-import org.apache.tinkerpop.gremlin.util.Serializer;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class RuleWritable implements Writable {
-
-    public enum Rule {
-        OR, AND, INCR, SET, NO_OP
-    }
-
-    private Rule rule;
-    private Object object;
-
-    public RuleWritable() {
-
-    }
-
-    public RuleWritable(final Rule rule, final Object object) {
-        this.rule = rule;
-        this.object = object;
-    }
-
-    public <T> T getObject() {
-        return (T) this.object;
-    }
-
-    public Rule getRule() {
-        return this.rule;
-    }
-
-    // TODO: Don't use Kryo (its sin)
-
-    @Override
-    public void readFields(final DataInput input) throws IOException {
-        this.rule = Rule.values()[WritableUtils.readVInt(input)];
-        final int objectLength = WritableUtils.readVInt(input);
-        final byte[] objectBytes = new byte[objectLength];
-        for (int i = 0; i < objectLength; i++) {
-            objectBytes[i] = input.readByte();
-        }
-        try {
-            this.object = Serializer.deserializeObject(objectBytes);
-        } catch (final ClassNotFoundException e) {
-            throw new IOException(e.getMessage(), e);
-        }
-
-        /*this.rule = Rule.values()[WritableUtils.readVInt(input)];
-        int objectLength = WritableUtils.readVInt(input);
-        byte[] bytes = new byte[objectLength];
-        for (int i = 0; i < objectLength; i++) {
-            bytes[i] = input.readByte();
-        }
-        final Input in = new Input(new ByteArrayInputStream(bytes));
-        this.object = Constants.GRYO.readClassAndObject(in);
-        in.close();*/
-    }
-
-    @Override
-    public void write(final DataOutput output) throws IOException {
-        WritableUtils.writeVInt(output, this.rule.ordinal());
-        final byte[] objectBytes = Serializer.serializeObject(this.object);
-        WritableUtils.writeVInt(output, objectBytes.length);
-        output.write(objectBytes);
-
-        /*
-        final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        final Output out = new Output(outputStream);
-        Constants.GRYO.writeClassAndObject(out, this.object);
-        out.flush();
-        WritableUtils.writeVInt(output, this.rule.ordinal());
-        WritableUtils.writeVInt(output, outputStream.toByteArray().length);
-        output.write(outputStream.toByteArray());
-        out.close(); */
-    }
-
-    public String toString() {
-        return this.rule + ":" + this.object;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/51919f4a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java
index 6bd9921..422b676 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java
@@ -28,22 +28,22 @@ public final class RuleAccumulator implements AccumulatorParam<Rule> {
 
     @Override
     public Rule addAccumulator(final Rule a, final Rule b) {
-        if (a.operation.equals(Rule.Operation.NO_OP))
+        if (a.getOperation().equals(Rule.Operation.NO_OP))
             return b;
-        if (b.operation.equals(Rule.Operation.NO_OP))
+        if (b.getOperation().equals(Rule.Operation.NO_OP))
             return a;
         else
-            return new Rule(b.operation, b.operation.compute(a.object, b.object));
+            return new Rule(b.getOperation(), b.getOperation().compute(a.getObject(), b.getObject()));
     }
 
     @Override
     public Rule addInPlace(final Rule a, final Rule b) {
-        if (a.operation.equals(Rule.Operation.NO_OP))
+        if (a.getOperation().equals(Rule.Operation.NO_OP))
             return b;
-        if (b.operation.equals(Rule.Operation.NO_OP))
+        if (b.getOperation().equals(Rule.Operation.NO_OP))
             return a;
         else
-            return new Rule(b.operation, b.operation.compute(a.object, b.object));
+            return new Rule(b.getOperation(), b.getOperation().compute(a.getObject(), b.getObject()));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/51919f4a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
index d02be69..e2de405 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
@@ -73,7 +73,7 @@ public final class SparkMemory implements Memory.Admin, Serializable {
         else {
             final Set<String> trueKeys = new HashSet<>();
             this.memory.forEach((key, value) -> {
-                if (value.value().object != null)
+                if (value.value().getObject() != null)
                     trueKeys.add(key);
             });
             return Collections.unmodifiableSet(trueKeys);
@@ -163,8 +163,8 @@ public final class SparkMemory implements Memory.Admin, Serializable {
         this.broadcast.destroy(true); // do we need to block?
         final Map<String, Object> toBroadcast = new HashMap<>();
         this.memory.forEach((key, rule) -> {
-            if (null != rule.value().object)
-                toBroadcast.put(key, rule.value().object);
+            if (null != rule.value().getObject())
+                toBroadcast.put(key, rule.value().getObject());
         });
         this.broadcast = sparkContext.broadcast(toBroadcast);
     }
@@ -176,6 +176,6 @@ public final class SparkMemory implements Memory.Admin, Serializable {
     }
 
     private <R> R getValue(final String key) {
-        return this.inTask ? (R) this.broadcast.value().get(key) : (R) this.memory.get(key).value().object;
+        return this.inTask ? (R) this.broadcast.value().get(key) : (R) this.memory.get(key).value().getObject();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/51919f4a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/Rule.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/Rule.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/Rule.java
index 7869da2..e4648c0 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/Rule.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/Rule.java
@@ -18,12 +18,19 @@
  */
 package org.apache.tinkerpop.gremlin.hadoop.process.computer.util;
 
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.tinkerpop.gremlin.util.Serializer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Serializable;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class Rule implements Serializable {
+public final class Rule implements Writable, Serializable {
 
     public enum Operation {
         OR {
@@ -67,15 +74,46 @@ public final class Rule implements Serializable {
         public abstract Object compute(final Object first, final Object second);
     }
 
-    public final Operation operation;
-    public final Object object;
+    private Operation operation;
+    private Object object;
 
     public Rule(final Operation operation, final Object object) {
         this.operation = operation;
         this.object = object;
     }
 
+    public Operation getOperation() {
+        return this.operation;
+    }
+
+    public <R> R getObject() {
+        return (R) this.object;
+    }
+
     public String toString() {
         return "rule[" + this.operation + ":" + this.object + "]";
     }
+
+    @Override
+    public void write(final DataOutput output) throws IOException {
+        WritableUtils.writeVInt(output, this.operation.ordinal());
+        final byte[] objectBytes = Serializer.serializeObject(this.object);
+        WritableUtils.writeVInt(output, objectBytes.length);
+        output.write(objectBytes);
+    }
+
+    @Override
+    public void readFields(final DataInput input) throws IOException {
+        this.operation = Operation.values()[WritableUtils.readVInt(input)];
+        final int objectLength = WritableUtils.readVInt(input);
+        final byte[] objectBytes = new byte[objectLength];
+        for (int i = 0; i < objectLength; i++) {
+            objectBytes[i] = input.readByte();
+        }
+        try {
+            this.object = Serializer.deserializeObject(objectBytes);
+        } catch (final ClassNotFoundException e) {
+            throw new IOException(e.getMessage(), e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/51919f4a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
index bff3960..b714e2e 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
@@ -19,18 +19,15 @@
 package org.apache.tinkerpop.gremlin.hadoop.structure.io;
 
 import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 public final class HadoopPools {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(HadoopPools.class);
-
     private HadoopPools() {
     }
 
@@ -53,7 +50,7 @@ public final class HadoopPools {
 
     public static GryoPool getGryoPool() {
         if (!INITIALIZED)
-            LOGGER.info("The " + HadoopPools.class.getSimpleName() + " has not be initialized, using the default pool");     // TODO: this is necessary because we can't get the pool intialized in the Merger code of the Hadoop process.
+            HadoopGraph.LOGGER.warn("The " + HadoopPools.class.getSimpleName() + " has not be initialized, using the default pool");     // TODO: this is necessary because we can't get the pool intialized in the Merger code of the Hadoop process.
         return GRYO_POOL;
     }
 }