You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/05/21 22:19:06 UTC
incubator-tinkerpop git commit: Giraph and Spark now both use the
same Rule engine for Aggregating memory -- INCR, AND, OR, SET, etc.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master 65de05dd4 -> 51919f4a9
Giraph and Spark now both use the same Rule engine for Aggregating memory -- INCR, AND, OR, SET, etc.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/51919f4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/51919f4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/51919f4a
Branch: refs/heads/master
Commit: 51919f4a9b3a497048298b672e8a9239515a0af2
Parents: 65de05d
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu May 21 14:19:08 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu May 21 14:19:18 2015 -0600
----------------------------------------------------------------------
.../computer/giraph/GiraphComputeVertex.java | 3 +-
.../process/computer/giraph/GiraphMemory.java | 31 +++---
.../computer/giraph/MemoryAggregator.java | 65 +++++------
.../process/computer/giraph/RuleWritable.java | 107 -------------------
.../process/computer/spark/RuleAccumulator.java | 12 +--
.../process/computer/spark/SparkMemory.java | 8 +-
.../hadoop/process/computer/util/Rule.java | 44 +++++++-
.../hadoop/structure/io/HadoopPools.java | 7 +-
8 files changed, 104 insertions(+), 173 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/51919f4a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
index eb07274..1507def 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphComputeVertex.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.NullWritable;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.Rule;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
@@ -50,7 +51,7 @@ public final class GiraphComputeVertex extends Vertex<ObjectWritable, VertexWrit
final GiraphMemory memory = workerContext.getMemory();
final GiraphMessenger messenger = workerContext.getMessenger(this, messages.iterator());
///////////
- if (!(Boolean) ((RuleWritable) this.getAggregatedValue(Constants.GREMLIN_HADOOP_HALT)).getObject()) {
+ if (!(Boolean) ((Rule) this.getAggregatedValue(Constants.GREMLIN_HADOOP_HALT)).getObject()) {
vertexProgram.execute(ComputerGraph.vertexProgram(this.getValue().get(), vertexProgram), messenger, memory);
} else if (workerContext.deriveMemory()) {
final MapMemory mapMemory = new MapMemory();
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/51919f4a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMemory.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMemory.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMemory.java
index 306bf6f..78e0e45 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMemory.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMemory.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph;
import org.apache.commons.configuration.Configuration;
import org.apache.giraph.master.MasterCompute;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.Rule;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
@@ -77,7 +78,7 @@ public final class GiraphMemory extends MasterCompute implements Memory {
}
this.registerPersistentAggregator(Constants.GREMLIN_HADOOP_HALT, MemoryAggregator.class);
this.registerPersistentAggregator(Constants.HIDDEN_RUNTIME, MemoryAggregator.class);
- this.setAggregatedValue(Constants.GREMLIN_HADOOP_HALT, new RuleWritable(RuleWritable.Rule.SET, false));
+ this.setAggregatedValue(Constants.GREMLIN_HADOOP_HALT, new Rule(Rule.Operation.SET, Boolean.FALSE));
this.set(Constants.HIDDEN_RUNTIME, System.currentTimeMillis());
} catch (final Exception e) {
throw new IllegalStateException(e.getMessage(), e);
@@ -90,7 +91,7 @@ public final class GiraphMemory extends MasterCompute implements Memory {
if (!this.getConf().getBoolean(Constants.GREMLIN_HADOOP_DERIVE_MEMORY, false)) // no need for the extra BSP round if memory is not required
this.haltComputation();
else
- this.setAggregatedValue(Constants.GREMLIN_HADOOP_HALT, new RuleWritable(RuleWritable.Rule.SET, true));
+ this.setAggregatedValue(Constants.GREMLIN_HADOOP_HALT, new Rule(Rule.Operation.SET, Boolean.TRUE));
}
}
}
@@ -117,14 +118,14 @@ public final class GiraphMemory extends MasterCompute implements Memory {
@Override
public boolean exists(final String key) {
- final RuleWritable rule = this.isMasterCompute ? this.getAggregatedValue(key) : this.worker.getAggregatedValue(key);
+ final Rule rule = this.isMasterCompute ? this.getAggregatedValue(key) : this.worker.getAggregatedValue(key);
return null != rule.getObject();
}
@Override
public <R> R get(final String key) throws IllegalArgumentException {
//this.checkKey(key);
- final RuleWritable rule = this.isMasterCompute ? this.getAggregatedValue(key) : this.worker.getAggregatedValue(key);
+ final Rule rule = this.isMasterCompute ? this.getAggregatedValue(key) : this.worker.getAggregatedValue(key);
if (null == rule.getObject())
throw Memory.Exceptions.memoryDoesNotExist(key);
else
@@ -135,20 +136,20 @@ public final class GiraphMemory extends MasterCompute implements Memory {
public void set(final String key, Object value) {
this.checkKeyValue(key, value);
if (this.isMasterCompute)
- this.setAggregatedValue(key, new RuleWritable(RuleWritable.Rule.SET, value));
+ this.setAggregatedValue(key, new Rule(Rule.Operation.SET, value));
else
- this.worker.aggregate(key, new RuleWritable(RuleWritable.Rule.SET, value));
+ this.worker.aggregate(key, new Rule(Rule.Operation.SET, value));
}
@Override
public void and(final String key, final boolean bool) {
this.checkKeyValue(key, bool);
if (this.isMasterCompute) { // only called on setup() and terminate()
- Boolean value = this.<RuleWritable>getAggregatedValue(key).<Boolean>getObject();
+ Boolean value = this.<Rule>getAggregatedValue(key).<Boolean>getObject();
value = null == value ? bool : bool && value;
- this.setAggregatedValue(key, new RuleWritable(RuleWritable.Rule.AND, value));
+ this.setAggregatedValue(key, new Rule(Rule.Operation.AND, value));
} else {
- this.worker.aggregate(key, new RuleWritable(RuleWritable.Rule.AND, bool));
+ this.worker.aggregate(key, new Rule(Rule.Operation.AND, bool));
}
}
@@ -156,11 +157,11 @@ public final class GiraphMemory extends MasterCompute implements Memory {
public void or(final String key, final boolean bool) {
this.checkKeyValue(key, bool);
if (this.isMasterCompute) { // only called on setup() and terminate()
- Boolean value = this.<RuleWritable>getAggregatedValue(key).<Boolean>getObject();
+ Boolean value = this.<Rule>getAggregatedValue(key).<Boolean>getObject();
value = null == value ? bool : bool || value;
- this.setAggregatedValue(key, new RuleWritable(RuleWritable.Rule.OR, value));
+ this.setAggregatedValue(key, new Rule(Rule.Operation.OR, value));
} else {
- this.worker.aggregate(key, new RuleWritable(RuleWritable.Rule.OR, bool));
+ this.worker.aggregate(key, new Rule(Rule.Operation.OR, bool));
}
}
@@ -168,11 +169,11 @@ public final class GiraphMemory extends MasterCompute implements Memory {
public void incr(final String key, final long delta) {
this.checkKeyValue(key, delta);
if (this.isMasterCompute) { // only called on setup() and terminate()
- Number value = this.<RuleWritable>getAggregatedValue(key).<Number>getObject();
+ Number value = this.<Rule>getAggregatedValue(key).<Number>getObject();
value = null == value ? delta : value.longValue() + delta;
- this.setAggregatedValue(key, new RuleWritable(RuleWritable.Rule.INCR, value));
+ this.setAggregatedValue(key, new Rule(Rule.Operation.INCR, value));
} else {
- this.worker.aggregate(key, new RuleWritable(RuleWritable.Rule.INCR, delta));
+ this.worker.aggregate(key, new Rule(Rule.Operation.INCR, delta));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/51919f4a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/MemoryAggregator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/MemoryAggregator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/MemoryAggregator.java
index 1ba0f4e..8f11fb3 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/MemoryAggregator.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/MemoryAggregator.java
@@ -19,68 +19,69 @@
package org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph;
import org.apache.giraph.aggregators.Aggregator;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.Rule;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public final class MemoryAggregator implements Aggregator<RuleWritable> {
+public final class MemoryAggregator implements Aggregator<Rule> {
- private Object value;
- private RuleWritable.Rule lastRule = null;
+ private Object currentObject;
+ private Rule.Operation lastOperation = null;
public MemoryAggregator() {
- this.value = null;
+ this.currentObject = null;
}
@Override
- public RuleWritable getAggregatedValue() {
- if (null == this.value)
+ public Rule getAggregatedValue() {
+ if (null == this.currentObject)
return createInitialValue();
- else if (this.value instanceof Long)
- return new RuleWritable(RuleWritable.Rule.INCR, this.value);
+ else if (this.currentObject instanceof Long)
+ return new Rule(Rule.Operation.INCR, this.currentObject);
else
- return new RuleWritable(null == this.lastRule ? RuleWritable.Rule.NO_OP : this.lastRule, this.value);
+ return new Rule(null == this.lastOperation ? Rule.Operation.NO_OP : this.lastOperation, this.currentObject);
}
@Override
- public void setAggregatedValue(final RuleWritable rule) {
- this.value = rule.getObject();
+ public void setAggregatedValue(final Rule rule) {
+ this.currentObject = rule.getObject();
}
@Override
public void reset() {
- this.value = null;
+ this.currentObject = null;
}
@Override
- public RuleWritable createInitialValue() {
- return new RuleWritable(RuleWritable.Rule.NO_OP, null);
+ public Rule createInitialValue() {
+ return new Rule(Rule.Operation.NO_OP, null);
}
@Override
- public void aggregate(RuleWritable ruleWritable) {
- final RuleWritable.Rule rule = ruleWritable.getRule();
+ public void aggregate(final Rule ruleWritable) {
+ final Rule.Operation rule = ruleWritable.getOperation();
final Object object = ruleWritable.getObject();
- if (rule != RuleWritable.Rule.NO_OP)
- this.lastRule = rule;
+ if (rule != Rule.Operation.NO_OP)
+ this.lastOperation = rule;
- if (null == this.value || rule.equals(RuleWritable.Rule.SET)) {
- this.value = object;
+ if (null == this.currentObject || rule.equals(Rule.Operation.SET)) {
+ this.currentObject = object;
} else {
- if (rule.equals(RuleWritable.Rule.INCR)) {
- this.value = (Long) this.value + (Long) object;
- } else if (rule.equals(RuleWritable.Rule.AND)) {
- this.value = (Boolean) this.value && (Boolean) object;
- } else if (rule.equals(RuleWritable.Rule.OR)) {
- this.value = (Boolean) this.value || (Boolean) object;
- } else if (rule.equals(RuleWritable.Rule.NO_OP)) {
+ if (rule.equals(Rule.Operation.INCR)) {
+ this.currentObject = (Long) this.currentObject + (Long) object;
+ } else if (rule.equals(Rule.Operation.AND)) {
+ this.currentObject = (Boolean) this.currentObject && (Boolean) object;
+ } else if (rule.equals(Rule.Operation.OR)) {
+ this.currentObject = (Boolean) this.currentObject || (Boolean) object;
+ } else if (rule.equals(Rule.Operation.NO_OP)) {
if (object instanceof Boolean) { // only happens when NO_OP booleans are being propagated will this occur
- if (null == this.lastRule) {
+ if (null == this.lastOperation) {
// do nothing ... why?
- } else if (this.lastRule.equals(RuleWritable.Rule.AND)) {
- this.value = (Boolean) this.value && (Boolean) object;
- } else if (this.lastRule.equals(RuleWritable.Rule.OR)) {
- this.value = (Boolean) this.value || (Boolean) object;
+ } else if (this.lastOperation.equals(Rule.Operation.AND)) {
+ this.currentObject = (Boolean) this.currentObject && (Boolean) object;
+ } else if (this.lastOperation.equals(Rule.Operation.OR)) {
+ this.currentObject = (Boolean) this.currentObject || (Boolean) object;
} else {
throw new IllegalStateException("This state should not have occurred: " + ruleWritable);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/51919f4a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/RuleWritable.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/RuleWritable.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/RuleWritable.java
deleted file mode 100644
index ed309e7..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/RuleWritable.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph;
-
-import org.apache.tinkerpop.gremlin.util.Serializer;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class RuleWritable implements Writable {
-
- public enum Rule {
- OR, AND, INCR, SET, NO_OP
- }
-
- private Rule rule;
- private Object object;
-
- public RuleWritable() {
-
- }
-
- public RuleWritable(final Rule rule, final Object object) {
- this.rule = rule;
- this.object = object;
- }
-
- public <T> T getObject() {
- return (T) this.object;
- }
-
- public Rule getRule() {
- return this.rule;
- }
-
- // TODO: Don't use Kryo (its sin)
-
- @Override
- public void readFields(final DataInput input) throws IOException {
- this.rule = Rule.values()[WritableUtils.readVInt(input)];
- final int objectLength = WritableUtils.readVInt(input);
- final byte[] objectBytes = new byte[objectLength];
- for (int i = 0; i < objectLength; i++) {
- objectBytes[i] = input.readByte();
- }
- try {
- this.object = Serializer.deserializeObject(objectBytes);
- } catch (final ClassNotFoundException e) {
- throw new IOException(e.getMessage(), e);
- }
-
- /*this.rule = Rule.values()[WritableUtils.readVInt(input)];
- int objectLength = WritableUtils.readVInt(input);
- byte[] bytes = new byte[objectLength];
- for (int i = 0; i < objectLength; i++) {
- bytes[i] = input.readByte();
- }
- final Input in = new Input(new ByteArrayInputStream(bytes));
- this.object = Constants.GRYO.readClassAndObject(in);
- in.close();*/
- }
-
- @Override
- public void write(final DataOutput output) throws IOException {
- WritableUtils.writeVInt(output, this.rule.ordinal());
- final byte[] objectBytes = Serializer.serializeObject(this.object);
- WritableUtils.writeVInt(output, objectBytes.length);
- output.write(objectBytes);
-
- /*
- final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- final Output out = new Output(outputStream);
- Constants.GRYO.writeClassAndObject(out, this.object);
- out.flush();
- WritableUtils.writeVInt(output, this.rule.ordinal());
- WritableUtils.writeVInt(output, outputStream.toByteArray().length);
- output.write(outputStream.toByteArray());
- out.close(); */
- }
-
- public String toString() {
- return this.rule + ":" + this.object;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/51919f4a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java
index 6bd9921..422b676 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java
@@ -28,22 +28,22 @@ public final class RuleAccumulator implements AccumulatorParam<Rule> {
@Override
public Rule addAccumulator(final Rule a, final Rule b) {
- if (a.operation.equals(Rule.Operation.NO_OP))
+ if (a.getOperation().equals(Rule.Operation.NO_OP))
return b;
- if (b.operation.equals(Rule.Operation.NO_OP))
+ if (b.getOperation().equals(Rule.Operation.NO_OP))
return a;
else
- return new Rule(b.operation, b.operation.compute(a.object, b.object));
+ return new Rule(b.getOperation(), b.getOperation().compute(a.getObject(), b.getObject()));
}
@Override
public Rule addInPlace(final Rule a, final Rule b) {
- if (a.operation.equals(Rule.Operation.NO_OP))
+ if (a.getOperation().equals(Rule.Operation.NO_OP))
return b;
- if (b.operation.equals(Rule.Operation.NO_OP))
+ if (b.getOperation().equals(Rule.Operation.NO_OP))
return a;
else
- return new Rule(b.operation, b.operation.compute(a.object, b.object));
+ return new Rule(b.getOperation(), b.getOperation().compute(a.getObject(), b.getObject()));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/51919f4a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
index d02be69..e2de405 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
@@ -73,7 +73,7 @@ public final class SparkMemory implements Memory.Admin, Serializable {
else {
final Set<String> trueKeys = new HashSet<>();
this.memory.forEach((key, value) -> {
- if (value.value().object != null)
+ if (value.value().getObject() != null)
trueKeys.add(key);
});
return Collections.unmodifiableSet(trueKeys);
@@ -163,8 +163,8 @@ public final class SparkMemory implements Memory.Admin, Serializable {
this.broadcast.destroy(true); // do we need to block?
final Map<String, Object> toBroadcast = new HashMap<>();
this.memory.forEach((key, rule) -> {
- if (null != rule.value().object)
- toBroadcast.put(key, rule.value().object);
+ if (null != rule.value().getObject())
+ toBroadcast.put(key, rule.value().getObject());
});
this.broadcast = sparkContext.broadcast(toBroadcast);
}
@@ -176,6 +176,6 @@ public final class SparkMemory implements Memory.Admin, Serializable {
}
private <R> R getValue(final String key) {
- return this.inTask ? (R) this.broadcast.value().get(key) : (R) this.memory.get(key).value().object;
+ return this.inTask ? (R) this.broadcast.value().get(key) : (R) this.memory.get(key).value().getObject();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/51919f4a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/Rule.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/Rule.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/Rule.java
index 7869da2..e4648c0 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/Rule.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/Rule.java
@@ -18,12 +18,19 @@
*/
package org.apache.tinkerpop.gremlin.hadoop.process.computer.util;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.tinkerpop.gremlin.util.Serializer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.io.Serializable;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public final class Rule implements Serializable {
+public final class Rule implements Writable, Serializable {
public enum Operation {
OR {
@@ -67,15 +74,46 @@ public final class Rule implements Serializable {
public abstract Object compute(final Object first, final Object second);
}
- public final Operation operation;
- public final Object object;
+ private Operation operation;
+ private Object object;
public Rule(final Operation operation, final Object object) {
this.operation = operation;
this.object = object;
}
+ public Operation getOperation() {
+ return this.operation;
+ }
+
+ public <R> R getObject() {
+ return (R) this.object;
+ }
+
public String toString() {
return "rule[" + this.operation + ":" + this.object + "]";
}
+
+ @Override
+ public void write(final DataOutput output) throws IOException {
+ WritableUtils.writeVInt(output, this.operation.ordinal());
+ final byte[] objectBytes = Serializer.serializeObject(this.object);
+ WritableUtils.writeVInt(output, objectBytes.length);
+ output.write(objectBytes);
+ }
+
+ @Override
+ public void readFields(final DataInput input) throws IOException {
+ this.operation = Operation.values()[WritableUtils.readVInt(input)];
+ final int objectLength = WritableUtils.readVInt(input);
+ final byte[] objectBytes = new byte[objectLength];
+ for (int i = 0; i < objectLength; i++) {
+ objectBytes[i] = input.readByte();
+ }
+ try {
+ this.object = Serializer.deserializeObject(objectBytes);
+ } catch (final ClassNotFoundException e) {
+ throw new IOException(e.getMessage(), e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/51919f4a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
index bff3960..b714e2e 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
@@ -19,18 +19,15 @@
package org.apache.tinkerpop.gremlin.hadoop.structure.io;
import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public final class HadoopPools {
- private static final Logger LOGGER = LoggerFactory.getLogger(HadoopPools.class);
-
private HadoopPools() {
}
@@ -53,7 +50,7 @@ public final class HadoopPools {
public static GryoPool getGryoPool() {
if (!INITIALIZED)
- LOGGER.info("The " + HadoopPools.class.getSimpleName() + " has not be initialized, using the default pool"); // TODO: this is necessary because we can't get the pool intialized in the Merger code of the Hadoop process.
+ HadoopGraph.LOGGER.warn("The " + HadoopPools.class.getSimpleName() + " has not be initialized, using the default pool"); // TODO: this is necessary because we can't get the pool intialized in the Merger code of the Hadoop process.
return GRYO_POOL;
}
}