You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ed...@apache.org on 2015/04/10 21:20:28 UTC
[2/3] git commit: updated refs/heads/trunk to 5d0b81a
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBoolean.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBoolean.java b/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBoolean.java
index 375d054..14f889d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBoolean.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBoolean.java
@@ -30,8 +30,6 @@ public class PerGraphTypeBoolean {
private boolean vertexValue;
/** data for edge value */
private boolean edgeValue;
- /** data for incoming message */
- private boolean incomingMessage;
/** data for outgoing message */
private boolean outgoingMessage;
@@ -60,7 +58,6 @@ public class PerGraphTypeBoolean {
setVertexId(options.getVertexId(), conf);
setVertexValue(options.getVertexValue(), conf);
setEdgeValue(options.getEdgeValue(), conf);
- setIncomingMessage(options.getIncomingMessage(), conf);
setOutgoingMessage(options.getOutgoingMessage(), conf);
}
@@ -95,16 +92,6 @@ public class PerGraphTypeBoolean {
}
/**
- * Set the incoming message value data from the option
- *
- * @param option EnumConfOption option to use
- * @param conf Configuration
- */
- public void setIncomingMessage(BooleanConfOption option, Configuration conf) {
- incomingMessage = option.get(conf);
- }
-
- /**
* Set the outgoing message value data from the option
*
* @param option EnumConfOption option to use
@@ -128,8 +115,6 @@ public class PerGraphTypeBoolean {
return vertexValue;
case EDGE_VALUE:
return edgeValue;
- case INCOMING_MESSAGE_VALUE:
- return incomingMessage;
case OUTGOING_MESSAGE_VALUE:
return outgoingMessage;
default:
@@ -142,10 +127,6 @@ public class PerGraphTypeBoolean {
return edgeValue;
}
- public boolean getIncomingMessage() {
- return incomingMessage;
- }
-
public boolean getOutgoingMessage() {
return outgoingMessage;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBooleanConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBooleanConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBooleanConfOption.java
index adfa979..d622546 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBooleanConfOption.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBooleanConfOption.java
@@ -30,8 +30,6 @@ public class PerGraphTypeBooleanConfOption {
private final BooleanConfOption vertexValue;
/** option for edge value */
private final BooleanConfOption edgeValue;
- /** option for incoming message */
- private final BooleanConfOption incomingMessage;
/** option for outgoing message */
private final BooleanConfOption outgoingMessage;
@@ -50,8 +48,6 @@ public class PerGraphTypeBooleanConfOption {
defaultValue, description);
edgeValue = new BooleanConfOption(keyPrefix + ".edge.value",
defaultValue, description);
- incomingMessage = new BooleanConfOption(keyPrefix + ".incoming.message",
- defaultValue, description);
outgoingMessage = new BooleanConfOption(keyPrefix + ".outgoing.message",
defaultValue, description);
}
@@ -70,8 +66,6 @@ public class PerGraphTypeBooleanConfOption {
return vertexValue;
case EDGE_VALUE:
return edgeValue;
- case INCOMING_MESSAGE_VALUE:
- return incomingMessage;
case OUTGOING_MESSAGE_VALUE:
return outgoingMessage;
default:
@@ -95,10 +89,6 @@ public class PerGraphTypeBooleanConfOption {
return edgeValue;
}
- public BooleanConfOption getIncomingMessage() {
- return incomingMessage;
- }
-
public BooleanConfOption getOutgoingMessage() {
return outgoingMessage;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnum.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnum.java b/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnum.java
index 7003709..199b2f9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnum.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnum.java
@@ -32,8 +32,6 @@ public class PerGraphTypeEnum<T extends Enum<T>> {
private T vertexValue;
/** data for edge value */
private T edgeValue;
- /** data for incoming message */
- private T incomingMessage;
/** data for outgoing message */
private T outgoingMessage;
@@ -63,7 +61,6 @@ public class PerGraphTypeEnum<T extends Enum<T>> {
setVertexId(options.getVertexId(), conf);
setVertexValue(options.getVertexValue(), conf);
setEdgeValue(options.getEdgeValue(), conf);
- setIncomingMessage(options.getIncomingMessage(), conf);
setOutgoingMessage(options.getOutgoingMessage(), conf);
}
@@ -98,16 +95,6 @@ public class PerGraphTypeEnum<T extends Enum<T>> {
}
/**
- * Set the incoming message value data from the option
- *
- * @param option EnumConfOption option to use
- * @param conf Configuration
- */
- public void setIncomingMessage(EnumConfOption<T> option, Configuration conf) {
- incomingMessage = option.get(conf);
- }
-
- /**
* Set the outgoing message value data from the option
*
* @param option EnumConfOption option to use
@@ -131,8 +118,6 @@ public class PerGraphTypeEnum<T extends Enum<T>> {
return vertexValue;
case EDGE_VALUE:
return edgeValue;
- case INCOMING_MESSAGE_VALUE:
- return incomingMessage;
case OUTGOING_MESSAGE_VALUE:
return outgoingMessage;
default:
@@ -145,10 +130,6 @@ public class PerGraphTypeEnum<T extends Enum<T>> {
return edgeValue;
}
- public T getIncomingMessage() {
- return incomingMessage;
- }
-
public T getOutgoingMessage() {
return outgoingMessage;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnumConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnumConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnumConfOption.java
index 8ae4576..c5041e2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnumConfOption.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnumConfOption.java
@@ -32,8 +32,6 @@ public class PerGraphTypeEnumConfOption<T extends Enum<T>> {
private final EnumConfOption<T> vertexValue;
/** option for edge value */
private final EnumConfOption<T> edgeValue;
- /** option for incoming message */
- private final EnumConfOption<T> incomingMessage;
/** option for outgoing message */
private final EnumConfOption<T> outgoingMessage;
@@ -53,8 +51,6 @@ public class PerGraphTypeEnumConfOption<T extends Enum<T>> {
defaultValue, description);
edgeValue = EnumConfOption.create(keyPrefix + ".edge.value",
klass, defaultValue, description);
- incomingMessage = EnumConfOption.create(keyPrefix + ".incoming.message",
- klass, defaultValue, description);
outgoingMessage = EnumConfOption.create(keyPrefix + ".outgoing.message",
klass, defaultValue, description);
}
@@ -89,8 +85,6 @@ public class PerGraphTypeEnumConfOption<T extends Enum<T>> {
return vertexValue;
case EDGE_VALUE:
return edgeValue;
- case INCOMING_MESSAGE_VALUE:
- return incomingMessage;
case OUTGOING_MESSAGE_VALUE:
return outgoingMessage;
default:
@@ -114,10 +108,6 @@ public class PerGraphTypeEnumConfOption<T extends Enum<T>> {
return edgeValue;
}
- public EnumConfOption<T> getIncomingMessage() {
- return incomingMessage;
- }
-
public EnumConfOption<T> getOutgoingMessage() {
return outgoingMessage;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/factories/AbstractMessageValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/AbstractMessageValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/AbstractMessageValueFactory.java
deleted file mode 100644
index 5551439..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/factories/AbstractMessageValueFactory.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.factories;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.utils.WritableUtils;
-import org.apache.hadoop.io.Writable;
-
-import com.google.common.base.Objects;
-
-/**
- * Factory class to create default message values.
- *
- * @param <M> Message Value
- */
-public abstract class AbstractMessageValueFactory<M extends Writable>
- implements MessageValueFactory<M> {
- /** Message value class */
- private Class<M> messageValueClass;
- /** Configuration */
- private ImmutableClassesGiraphConfiguration conf;
-
- /**
- * Get the message value class from the configuration
- *
- * @param conf Configuration
- * @return message value Class
- */
- protected abstract Class<M> extractMessageValueClass(
- ImmutableClassesGiraphConfiguration conf);
-
- @Override
- public Class<M> getValueClass() {
- return messageValueClass;
- }
-
- @Override
- public void initialize(ImmutableClassesGiraphConfiguration conf) {
- this.conf = conf;
- messageValueClass = extractMessageValueClass(conf);
- }
-
- @Override public M newInstance() {
- return WritableUtils.createWritable(messageValueClass, conf);
- }
-
- @Override public String toString() {
- return Objects.toStringHelper(this)
- .add("messageValueClass", messageValueClass)
- .toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/factories/DefaultEdgeValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultEdgeValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultEdgeValueFactory.java
index 998c06f..a838e0f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultEdgeValueFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultEdgeValueFactory.java
@@ -17,6 +17,7 @@
*/
package org.apache.giraph.factories;
+import org.apache.giraph.conf.GiraphConfigurationSettable;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.utils.WritableUtils;
import org.apache.hadoop.io.Writable;
@@ -29,21 +30,18 @@ import org.apache.hadoop.io.Writable;
* @param <E> Edge Value
*/
public class DefaultEdgeValueFactory<E extends Writable>
- implements EdgeValueFactory<E> {
+ implements EdgeValueFactory<E>, GiraphConfigurationSettable {
/** Cached edge value class. */
private Class<E> edgeValueClass;
/** Configuration */
private ImmutableClassesGiraphConfiguration conf;
- @Override public void initialize(ImmutableClassesGiraphConfiguration conf) {
+ @Override
+ public void setConf(ImmutableClassesGiraphConfiguration conf) {
this.conf = conf;
edgeValueClass = conf.getEdgeValueClass();
}
- @Override public Class<E> getValueClass() {
- return edgeValueClass;
- }
-
@Override public E newInstance() {
return WritableUtils.createWritable(edgeValueClass, conf);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/factories/DefaultIncomingMessageValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultIncomingMessageValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultIncomingMessageValueFactory.java
deleted file mode 100644
index bdf2966..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultIncomingMessageValueFactory.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.factories;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.hadoop.io.Writable;
-
-/**
- * Factory class to create default incoming message values.
- *
- * @param <M> Incoming Message Value
- */
-public class DefaultIncomingMessageValueFactory<M extends Writable> extends
- AbstractMessageValueFactory<M> {
- @Override protected Class<M> extractMessageValueClass(
- ImmutableClassesGiraphConfiguration conf) {
- return conf.getIncomingMessageValueClass();
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/factories/DefaultMessageValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultMessageValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultMessageValueFactory.java
new file mode 100644
index 0000000..b10d30c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultMessageValueFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.factories;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.base.Objects;
+
+/**
+ * Factory class to create default message values.
+ *
+ * @param <M> Message Value
+ */
+public class DefaultMessageValueFactory<M extends Writable>
+ implements MessageValueFactory<M> {
+ /** Message value class */
+ private final Class<M> messageValueClass;
+ /** Configuration */
+ private final ImmutableClassesGiraphConfiguration conf;
+
+ /**
+ * Constructor
+ * @param messageValueClass message value class
+ * @param conf configuration
+ */
+ public DefaultMessageValueFactory(Class<M> messageValueClass,
+ ImmutableClassesGiraphConfiguration conf) {
+ this.messageValueClass = messageValueClass;
+ this.conf = conf;
+ }
+
+ @Override public M newInstance() {
+ return WritableUtils.createWritable(messageValueClass, conf);
+ }
+
+ @Override public String toString() {
+ return Objects.toStringHelper(this)
+ .add("messageValueClass", messageValueClass)
+ .toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/factories/DefaultOutgoingMessageValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultOutgoingMessageValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultOutgoingMessageValueFactory.java
deleted file mode 100644
index a42d5e2..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultOutgoingMessageValueFactory.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.factories;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.hadoop.io.Writable;
-
-/**
- * Factory class to create default outgoing message values.
- *
- * @param <M> Outgoing Message Value
- */
-public class DefaultOutgoingMessageValueFactory<M extends Writable> extends
- AbstractMessageValueFactory<M> {
- @Override protected Class<M> extractMessageValueClass(
- ImmutableClassesGiraphConfiguration conf) {
- return conf.getOutgoingMessageValueClass();
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexIdFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexIdFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexIdFactory.java
index 305548c..d36ae57 100644
--- a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexIdFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexIdFactory.java
@@ -17,6 +17,7 @@
*/
package org.apache.giraph.factories;
+import org.apache.giraph.conf.GiraphConfigurationSettable;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.utils.WritableUtils;
import org.apache.hadoop.io.WritableComparable;
@@ -28,24 +29,19 @@ import org.apache.hadoop.io.WritableComparable;
* @param <I> Vertex ID
*/
public class DefaultVertexIdFactory<I extends WritableComparable>
- implements VertexIdFactory<I> {
+ implements VertexIdFactory<I>, GiraphConfigurationSettable {
/** Cached vertex value class. */
private Class<I> vertexIdClass;
/** Configuration */
private ImmutableClassesGiraphConfiguration conf;
@Override
- public void initialize(ImmutableClassesGiraphConfiguration conf) {
+ public void setConf(ImmutableClassesGiraphConfiguration conf) {
this.conf = conf;
vertexIdClass = conf.getVertexIdClass();
}
@Override
- public Class<I> getValueClass() {
- return vertexIdClass;
- }
-
- @Override
public I newInstance() {
return WritableUtils.createWritable(vertexIdClass, conf);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexValueFactory.java
index 634f0d5..2cc124c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexValueFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexValueFactory.java
@@ -18,6 +18,7 @@
package org.apache.giraph.factories;
+import org.apache.giraph.conf.GiraphConfigurationSettable;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.utils.WritableUtils;
import org.apache.hadoop.io.Writable;
@@ -29,24 +30,19 @@ import org.apache.hadoop.io.Writable;
* @param <V> Vertex value
*/
public class DefaultVertexValueFactory<V extends Writable>
- implements VertexValueFactory<V> {
+ implements VertexValueFactory<V>, GiraphConfigurationSettable {
/** Cached vertex value class. */
private Class<V> vertexValueClass;
/** Configuration */
private ImmutableClassesGiraphConfiguration conf;
@Override
- public void initialize(ImmutableClassesGiraphConfiguration conf) {
+ public void setConf(ImmutableClassesGiraphConfiguration conf) {
this.conf = conf;
vertexValueClass = conf.getVertexValueClass();
}
@Override
- public Class<V> getValueClass() {
- return vertexValueClass;
- }
-
- @Override
public V newInstance() {
return WritableUtils.createWritable(vertexValueClass, conf);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/factories/TestMessageValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/TestMessageValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/TestMessageValueFactory.java
index 806664b..e8c1c6d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/factories/TestMessageValueFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/factories/TestMessageValueFactory.java
@@ -17,7 +17,6 @@
*/
package org.apache.giraph.factories;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.utils.ReflectionUtils;
import org.apache.hadoop.io.Writable;
@@ -40,13 +39,6 @@ public class TestMessageValueFactory<M extends Writable>
this.klass = klass;
}
- @Override public Class<M> getValueClass() {
- return klass;
- }
-
- @Override public void initialize(
- ImmutableClassesGiraphConfiguration conf) { }
-
@Override public M newInstance() {
return ReflectionUtils.newInstance(klass);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactories.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactories.java b/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactories.java
index 733d5f2..00f35f3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactories.java
+++ b/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactories.java
@@ -17,17 +17,14 @@
*/
package org.apache.giraph.factories;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
import static org.apache.giraph.conf.GiraphConstants.EDGE_VALUE_FACTORY_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.INCOMING_MESSAGE_VALUE_FACTORY_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.OUTGOING_MESSAGE_VALUE_FACTORY_CLASS;
import static org.apache.giraph.conf.GiraphConstants.VERTEX_ID_FACTORY_CLASS;
import static org.apache.giraph.conf.GiraphConstants.VERTEX_VALUE_FACTORY_CLASS;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
/**
* Holder for factories to create user types.
*
@@ -47,12 +44,6 @@ public class ValueFactories<I extends WritableComparable,
private final VertexValueFactory<V> vertexValueFactory;
/** Edge value factory. */
private final EdgeValueFactory<E> edgeValueFactory;
- // Note that for messages we store the class not the factory itself, because
- // the factory instance may change per-superstep if the graph types change.
- /** Incoming message value factory class */
- private final Class<? extends MessageValueFactory> inMsgFactoryClass;
- /** Outgoing message value factory class */
- private final Class<? extends MessageValueFactory> outMsgFactoryClass;
/**
* Constructor reading from Configuration
@@ -63,19 +54,6 @@ public class ValueFactories<I extends WritableComparable,
vertexIdFactory = VERTEX_ID_FACTORY_CLASS.newInstance(conf);
vertexValueFactory = VERTEX_VALUE_FACTORY_CLASS.newInstance(conf);
edgeValueFactory = EDGE_VALUE_FACTORY_CLASS.newInstance(conf);
- inMsgFactoryClass = INCOMING_MESSAGE_VALUE_FACTORY_CLASS.get(conf);
- outMsgFactoryClass = OUTGOING_MESSAGE_VALUE_FACTORY_CLASS.get(conf);
- }
-
- /**
- * Initialize all of the factories.
- *
- * @param conf ImmutableClassesGiraphConfiguration
- */
- public void initializeIVE(ImmutableClassesGiraphConfiguration<I, V, E> conf) {
- vertexIdFactory.initialize(conf);
- vertexValueFactory.initialize(conf);
- edgeValueFactory.initialize(conf);
}
public EdgeValueFactory<E> getEdgeValueFactory() {
@@ -89,12 +67,4 @@ public class ValueFactories<I extends WritableComparable,
public VertexValueFactory<V> getVertexValueFactory() {
return vertexValueFactory;
}
-
- public Class<? extends MessageValueFactory> getInMsgFactoryClass() {
- return inMsgFactoryClass;
- }
-
- public Class<? extends MessageValueFactory> getOutMsgFactoryClass() {
- return outMsgFactoryClass;
- }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactory.java
index 5725061..de56929 100644
--- a/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactory.java
@@ -17,7 +17,8 @@
*/
package org.apache.giraph.factories;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import java.io.Serializable;
+
import org.apache.hadoop.io.Writable;
/**
@@ -25,30 +26,11 @@ import org.apache.hadoop.io.Writable;
*
* @param <W> Writable type
*/
-public interface ValueFactory<W extends Writable> {
- /**
- * Initialize factory settings from the conf.
- * This gets called on startup and also if there are changes to the message
- * classes used. For example if the user's
- * {@link org.apache.giraph.master.MasterCompute} changes the
- * {@link org.apache.giraph.graph.Computation} and the next superstep has a
- * different message value type.
- *
- * @param conf Configuration
- */
- void initialize(ImmutableClassesGiraphConfiguration conf);
-
+public interface ValueFactory<W extends Writable> extends Serializable {
/**
* Create a new value.
*
* @return new value.
*/
W newInstance();
-
- /**
- * Get the java Class representing messages this factory creates
- *
- * @return Class<M>
- */
- Class<W> getValueClass();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index 996159f..226087c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -17,6 +17,12 @@
*/
package org.apache.giraph.graph;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.WorkerClientRequestProcessor;
import org.apache.giraph.comm.messages.MessageStore;
@@ -47,12 +53,6 @@ import com.google.common.collect.Lists;
import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.Histogram;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-
/**
* Compute as many vertex partitions as possible. Every thread will has its
* own instance of WorkerClientRequestProcessor to send requests. Note that
@@ -139,7 +139,9 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
// Thread initialization (for locality)
WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor =
new NettyWorkerClientRequestProcessor<I, V, E>(
- context, configuration, serviceWorker);
+ context, configuration, serviceWorker,
+ configuration.getOutgoingMessageEncodeAndStoreType().
+ useOneMessageToManyIdsEncoding());
WorkerThreadGlobalCommUsage aggregatorUsage =
serviceWorker.getAggregatorHandler().newThreadAggregatorUsage();
WorkerContext workerContext = serviceWorker.getWorkerContext();
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/graph/GraphType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphType.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphType.java
index 4a0ac8f..b02159e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphType.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphType.java
@@ -90,27 +90,6 @@ public enum GraphType {
return conf.getEdgeValueFactory();
}
},
- /** Incoming message value */
- INCOMING_MESSAGE_VALUE {
- @Override
- public ClassConfOption<? extends Writable> writableConfOption() {
- return GiraphConstants.INCOMING_MESSAGE_VALUE_CLASS;
- }
- @Override
- public ClassConfOption<? extends ValueFactory> factoryClassOption() {
- return GiraphConstants.INCOMING_MESSAGE_VALUE_FACTORY_CLASS;
- }
- @Override
- public <T extends Writable> Class<T> get(
- ImmutableClassesGiraphConfiguration conf) {
- return conf.getIncomingMessageValueClass();
- }
- @Override
- public <T extends Writable> ValueFactory<T> factory(
- ImmutableClassesGiraphConfiguration conf) {
- return conf.getIncomingMessageValueFactory();
- }
- },
/** Outgoing message value */
OUTGOING_MESSAGE_VALUE {
@Override
@@ -129,7 +108,7 @@ public enum GraphType {
@Override
public <T extends Writable> ValueFactory<T> factory(
ImmutableClassesGiraphConfiguration conf) {
- return conf.getOutgoingMessageValueFactory();
+ return conf.createOutgoingMessageValueFactory();
}
};
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
index 73a7aab..38f14d6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
@@ -18,6 +18,10 @@
package org.apache.giraph.job;
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_EDGES_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_RESOLVER_CLASS;
+import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments;
+
import org.apache.giraph.combiner.MessageCombiner;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -25,8 +29,8 @@ import org.apache.giraph.edge.OutEdges;
import org.apache.giraph.factories.DefaultVertexValueFactory;
import org.apache.giraph.factories.VertexValueFactory;
import org.apache.giraph.graph.DefaultVertexResolver;
-import org.apache.giraph.graph.VertexValueCombiner;
import org.apache.giraph.graph.VertexResolver;
+import org.apache.giraph.graph.VertexValueCombiner;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.EdgeOutputFormat;
import org.apache.giraph.io.VertexInputFormat;
@@ -36,10 +40,6 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
-import static org.apache.giraph.conf.GiraphConstants.VERTEX_EDGES_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.VERTEX_RESOLVER_CLASS;
-import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments;
-
/**
* GiraphConfigurationValidator attempts to verify the consistency of
* user-chosen InputFormat, OutputFormat, and Vertex generic type
@@ -81,7 +81,7 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
/**
* The Configuration object for use in the validation test.
*/
- private ImmutableClassesGiraphConfiguration conf;
+ private final ImmutableClassesGiraphConfiguration conf;
/**
* Constructor to execute the validation test, throws
@@ -126,7 +126,7 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
* @return outgoing message value type
*/
private Class<? extends Writable> outgoingMessageValueType() {
- return conf.getGiraphTypes().getOutgoingMessageValueClass();
+ return conf.getOutgoingMessageValueClass();
}
/**
@@ -266,11 +266,11 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
* generic params match the job.
*/
private void verifyMessageCombinerGenericTypes() {
- Class<? extends MessageCombiner<I, M2>> messageCombinerClass =
- conf.getMessageCombinerClass();
- if (messageCombinerClass != null) {
+ MessageCombiner<I, M2> messageCombiner =
+ conf.createOutgoingMessageCombiner();
+ if (messageCombiner != null) {
Class<?>[] classList =
- getTypeArguments(MessageCombiner.class, messageCombinerClass);
+ getTypeArguments(MessageCombiner.class, messageCombiner.getClass());
checkEquals(classList, ID_PARAM_INDEX, vertexIndexType(),
MessageCombiner.class, "vertex index");
checkEquals(classList, MSG_COMBINER_PARAM_INDEX,
@@ -356,7 +356,7 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
if (classList[index] == null) {
LOG.warn(klass.getSimpleName() + " " + typeName + " type is not known");
} else if (!classList[index].equals(classFromComputation)) {
- throw new IllegalArgumentException(
+ throw new IllegalStateException(
"checkClassTypes: " + typeName + " types not equal, " +
"computation - " + classFromComputation +
", " + klass.getSimpleName() + " - " +
@@ -378,7 +378,7 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
if (classList[index] == null) {
LOG.warn(klass.getSimpleName() + " " + typeName + " type is not known");
} else if (!classList[index].isAssignableFrom(classFromComputation)) {
- throw new IllegalArgumentException(
+ throw new IllegalStateException(
"checkClassTypes: " + typeName + " types not assignable, " +
"computation - " + classFromComputation +
", " + klass.getSimpleName() + " - " +
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/jython/JythonOptions.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/jython/JythonOptions.java b/giraph-core/src/main/java/org/apache/giraph/jython/JythonOptions.java
index 3266d9b..1eff92b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/jython/JythonOptions.java
+++ b/giraph-core/src/main/java/org/apache/giraph/jython/JythonOptions.java
@@ -86,10 +86,6 @@ public class JythonOptions {
public static final JythonGraphTypeOptions JYTHON_EDGE_VALUE =
new JythonGraphTypeOptions(GraphType.EDGE_VALUE);
- /** incoming message value options */
- public static final JythonGraphTypeOptions JYTHON_IN_MSG_VALUE =
- new JythonGraphTypeOptions(GraphType.INCOMING_MESSAGE_VALUE);
-
/** outgonig message value options */
public static final JythonGraphTypeOptions JYTHON_OUT_MSG_VALUE =
new JythonGraphTypeOptions(GraphType.OUTGOING_MESSAGE_VALUE);
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonFactoryBase.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonFactoryBase.java b/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonFactoryBase.java
index 0d3d34a..c0b286f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonFactoryBase.java
+++ b/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonFactoryBase.java
@@ -18,6 +18,7 @@
package org.apache.giraph.jython.factories;
import org.apache.giraph.conf.ClassConfOption;
+import org.apache.giraph.conf.GiraphConfigurationSettable;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.conf.StrConfOption;
import org.apache.giraph.factories.ValueFactory;
@@ -37,7 +38,7 @@ import org.python.core.PyObject;
* @param <W> writable type
*/
public abstract class JythonFactoryBase<W extends Writable>
- implements ValueFactory<W> {
+ implements ValueFactory<W>, GiraphConfigurationSettable {
/** Logger */
private static final Logger LOG = Logger.getLogger(JythonFactoryBase.class);
@@ -84,7 +85,7 @@ public abstract class JythonFactoryBase<W extends Writable>
}
@Override
- public void initialize(
+ public void setConf(
ImmutableClassesGiraphConfiguration conf) {
jythonClassName = jythonClassNameOption().get(conf);
useWrapper = conf.getValueNeedsWrappers().get(getGraphType());
@@ -105,14 +106,6 @@ public abstract class JythonFactoryBase<W extends Writable>
}
}
- @Override public Class<W> getValueClass() {
- if (useWrapper) {
- return (Class<W>) JythonWritableWrapper.class;
- } else {
- return (Class<W>) writableValueClass();
- }
- }
-
/**
* Use this factory in the {@link org.apache.hadoop.conf.Configuration}
*
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonIncomingMessageValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonIncomingMessageValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonIncomingMessageValueFactory.java
deleted file mode 100644
index e77e9c3..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonIncomingMessageValueFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.jython.factories;
-
-import org.apache.giraph.jython.JythonOptions;
-import org.apache.hadoop.io.Writable;
-
-/**
- * {@link MessageValueFactory} that creates incoming message values which are
- * Jython classes.
- *
- * @param <M> Incoming Message Value
- */
-public class JythonIncomingMessageValueFactory<M extends Writable>
- extends JythonMessageValueFactory<M> {
- @Override
- public JythonOptions.JythonGraphTypeOptions getOptions() {
- return JythonOptions.JYTHON_IN_MSG_VALUE;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonMessageValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonMessageValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonMessageValueFactory.java
index d2f8d9f..f798f51 100644
--- a/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonMessageValueFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonMessageValueFactory.java
@@ -32,9 +32,4 @@ public abstract class JythonMessageValueFactory<M extends Writable>
public M newInstance() {
return (M) newJythonClassInstance();
}
-
- @Override
- public Class<M> getValueClass() {
- return (Class<M>) writableValueClass();
- }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index e942157..0b56a4f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -50,8 +50,8 @@ import org.apache.giraph.bsp.ApplicationState;
import org.apache.giraph.bsp.BspInputFormat;
import org.apache.giraph.bsp.BspService;
import org.apache.giraph.bsp.CentralizedServiceMaster;
-import org.apache.giraph.bsp.checkpoints.CheckpointStatus;
import org.apache.giraph.bsp.SuperstepState;
+import org.apache.giraph.bsp.checkpoints.CheckpointStatus;
import org.apache.giraph.bsp.checkpoints.CheckpointSupportedChecker;
import org.apache.giraph.comm.MasterClient;
import org.apache.giraph.comm.MasterServer;
@@ -197,7 +197,7 @@ public class BspServiceMaster<I extends WritableComparable,
/** Current checkpoint status */
private CheckpointStatus checkpointStatus;
/** Checks if checkpointing supported */
- private CheckpointSupportedChecker checkpointSupportedChecker;
+ private final CheckpointSupportedChecker checkpointSupportedChecker;
/**
* Constructor for setting up the master.
@@ -803,7 +803,8 @@ public class BspServiceMaster<I extends WritableComparable,
GlobalStats globalStats = new GlobalStats();
globalStats.readFields(finalizedStream);
updateCounters(globalStats);
- SuperstepClasses superstepClasses = new SuperstepClasses();
+ SuperstepClasses superstepClasses =
+ SuperstepClasses.createToRead(getConfiguration());
superstepClasses.readFields(finalizedStream);
getConfiguration().updateSuperstepClasses(superstepClasses);
int prefixFileCount = finalizedStream.readInt();
@@ -1576,7 +1577,7 @@ public class BspServiceMaster<I extends WritableComparable,
GiraphStats.getInstance().getEdges().getValue(),
getContext());
SuperstepClasses superstepClasses =
- new SuperstepClasses(getConfiguration());
+ SuperstepClasses.createAndExtractTypes(getConfiguration());
masterCompute.setGraphState(graphState);
masterCompute.setSuperstepClasses(superstepClasses);
return superstepClasses;
@@ -1732,8 +1733,7 @@ public class BspServiceMaster<I extends WritableComparable,
// match) and if the computation is halted, no need to check any of
// the types.
if (!globalStats.getHaltComputation()) {
- superstepClasses.verifyTypesMatch(
- getConfiguration(), getSuperstep() != 0);
+ superstepClasses.verifyTypesMatch(getSuperstep() > 0);
}
getConfiguration().updateSuperstepClasses(superstepClasses);
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
index 85496c2..50e3b36 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
@@ -22,10 +22,12 @@ import org.apache.giraph.aggregators.Aggregator;
import org.apache.giraph.bsp.CentralizedServiceMaster;
import org.apache.giraph.combiner.MessageCombiner;
import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.MessageClasses;
import org.apache.giraph.graph.Computation;
import org.apache.giraph.graph.GraphState;
import org.apache.giraph.reducers.ReduceOperation;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
/**
@@ -173,8 +175,10 @@ public abstract class MasterCompute
/**
* Set incoming message class to be used
+ *
* @param incomingMessageClass incoming message class
*/
+ @Deprecated
public final void setIncomingMessage(
Class<? extends Writable> incomingMessageClass) {
superstepClasses.setIncomingMessageClass(incomingMessageClass);
@@ -182,6 +186,7 @@ public abstract class MasterCompute
/**
* Set outgoing message class to be used
+ *
* @param outgoingMessageClass outgoing message class
*/
public final void setOutgoingMessage(
@@ -189,6 +194,17 @@ public abstract class MasterCompute
superstepClasses.setOutgoingMessageClass(outgoingMessageClass);
}
+ /**
+ * Set outgoing message classes to be used
+ *
+ * @param outgoingMessageClasses outgoing message classes
+ */
+ public void setOutgoingMessageClasses(
+ MessageClasses<? extends WritableComparable, ? extends Writable>
+ outgoingMessageClasses) {
+ superstepClasses.setOutgoingMessageClasses(outgoingMessageClasses);
+ }
+
@Override
public final <S, R extends Writable> void registerReducer(
String name, ReduceOperation<S, R> reduceOp) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java b/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
index 8145109..8653c96 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
@@ -23,17 +23,21 @@ import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_LANGUAGE;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.lang.reflect.Modifier;
import org.apache.giraph.combiner.MessageCombiner;
+import org.apache.giraph.conf.DefaultMessageClasses;
+import org.apache.giraph.conf.GiraphClasses;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.MessageClasses;
import org.apache.giraph.conf.TypesHolder;
import org.apache.giraph.graph.Computation;
import org.apache.giraph.graph.Language;
import org.apache.giraph.utils.ReflectionUtils;
import org.apache.giraph.utils.WritableUtils;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
+import org.python.google.common.base.Preconditions;
/**
* Holds Computation and MessageCombiner class.
@@ -41,115 +45,170 @@ import org.apache.log4j.Logger;
public class SuperstepClasses implements Writable {
/** Class logger */
private static final Logger LOG = Logger.getLogger(SuperstepClasses.class);
+ /** Configuration */
+ private final ImmutableClassesGiraphConfiguration conf;
/** Computation class to be used in the following superstep */
private Class<? extends Computation> computationClass;
- /** MessageCombiner class to be used in the following superstep */
- private Class<? extends MessageCombiner> messageCombinerClass;
- /** Incoming message class to be used in the following superstep */
- private Class<? extends Writable> incomingMessageClass;
- /** Outgoing message class to be used in the following superstep */
- private Class<? extends Writable> outgoingMessageClass;
+ /** Incoming message classes, immutable, only here for cheecking */
+ private MessageClasses<? extends WritableComparable, ? extends Writable>
+ incomingMessageClasses;
+ /** Outgoing message classes */
+ private MessageClasses<? extends WritableComparable, ? extends Writable>
+ outgoingMessageClasses;
/**
- * Default constructor
+ * Constructor
+ * @param conf Configuration
+ * @param computationClass computation class
+ * @param incomingMessageClasses incoming message classes
+ * @param outgoingMessageClasses outgoing message classes
*/
- public SuperstepClasses() {
+ public SuperstepClasses(
+ ImmutableClassesGiraphConfiguration conf,
+ Class<? extends Computation> computationClass,
+ MessageClasses<? extends WritableComparable, ? extends Writable>
+ incomingMessageClasses,
+ MessageClasses<? extends WritableComparable, ? extends Writable>
+ outgoingMessageClasses) {
+ this.conf = conf;
+ this.computationClass = computationClass;
+ this.incomingMessageClasses = incomingMessageClasses;
+ this.outgoingMessageClasses = outgoingMessageClasses;
}
/**
- * Constructor
- *
+ * Create empty superstep classes, readFields needs to be called afterwards
* @param conf Configuration
+ * @return Superstep classes
*/
- @SuppressWarnings("unchecked")
- public SuperstepClasses(ImmutableClassesGiraphConfiguration conf) {
- this(conf.getComputationClass(), conf.getMessageCombinerClass());
+ public static SuperstepClasses createToRead(
+ ImmutableClassesGiraphConfiguration conf) {
+ return new SuperstepClasses(conf, null, null, null);
}
/**
- * Constructor
- *
- * @param computationClass Computation class
- * @param messageCombinerClass MessageCombiner class
+ * Create superstep classes by initiazling from current state
+ * in configuration
+ * @param conf Configuration
+ * @return Superstep classes
*/
- public SuperstepClasses(Class<? extends Computation> computationClass,
- Class<? extends MessageCombiner> messageCombinerClass) {
- this.computationClass = computationClass;
- this.messageCombinerClass =
- messageCombinerClass;
+ public static SuperstepClasses createAndExtractTypes(
+ ImmutableClassesGiraphConfiguration conf) {
+ return new SuperstepClasses(
+ conf,
+ conf.getComputationClass(),
+ conf.getOutgoingMessageClasses(),
+ conf.getOutgoingMessageClasses().createCopyForNewSuperstep());
}
public Class<? extends Computation> getComputationClass() {
return computationClass;
}
- public Class<? extends MessageCombiner> getMessageCombinerClass() {
- return messageCombinerClass;
+ public MessageClasses<? extends WritableComparable, ? extends Writable>
+ getOutgoingMessageClasses() {
+ return outgoingMessageClasses;
}
/**
- * Get incoming message class, either set directly, or through Computation
- * @return incoming message class
+ * Set's outgoing MessageClasses for next superstep.
+ * Should not be used together with
+ * setMessageCombinerClass/setOutgoingMessageClass methods.
+ *
+ * @param outgoingMessageClasses outgoing message classes
*/
- public Class<? extends Writable> getIncomingMessageClass() {
- if (incomingMessageClass != null) {
- return incomingMessageClass;
- }
- if (computationClass == null) {
- return null;
- }
- Class[] computationTypes = ReflectionUtils.getTypeArguments(
- TypesHolder.class, computationClass);
- return computationTypes[3];
+ public void setOutgoingMessageClasses(
+ MessageClasses<? extends WritableComparable, ? extends Writable>
+ outgoingMessageClasses) {
+ this.outgoingMessageClasses = outgoingMessageClasses;
}
/**
- * Get outgoing message class, either set directly, or through Computation
- * @return outgoing message class
+ * Set computation class
+ * @param computationClass computation class
*/
- public Class<? extends Writable> getOutgoingMessageClass() {
- if (outgoingMessageClass != null) {
- return outgoingMessageClass;
- }
- if (computationClass == null) {
- return null;
- }
- Class[] computationTypes = ReflectionUtils.getTypeArguments(
- TypesHolder.class, computationClass);
- return computationTypes[4];
- }
-
public void setComputationClass(
Class<? extends Computation> computationClass) {
this.computationClass = computationClass;
+
+ if (computationClass != null) {
+ Class[] computationTypes = ReflectionUtils.getTypeArguments(
+ TypesHolder.class, computationClass);
+ if (computationTypes[4] != null &&
+ outgoingMessageClasses instanceof DefaultMessageClasses) {
+ ((DefaultMessageClasses) outgoingMessageClasses)
+ .setIfNotModifiedMessageClass(computationTypes[4]);
+ }
+ }
}
+ /**
+ * Set message combiner class.
+ * Should not be used together setOutgoingMessageClasses
+ * (throws exception if called with it),
+ * as it is unnecessary to do so.
+ *
+ * @param messageCombinerClass message combiner class
+ */
public void setMessageCombinerClass(
Class<? extends MessageCombiner> messageCombinerClass) {
- this.messageCombinerClass = messageCombinerClass;
+ Preconditions.checkState(
+ outgoingMessageClasses instanceof DefaultMessageClasses);
+ ((DefaultMessageClasses) outgoingMessageClasses).
+ setMessageCombinerClass(messageCombinerClass);
}
+ /**
+ * Set incoming message class
+ * @param incomingMessageClass incoming message class
+ */
+ @Deprecated
public void setIncomingMessageClass(
Class<? extends Writable> incomingMessageClass) {
- this.incomingMessageClass = incomingMessageClass;
+ if (!incomingMessageClasses.getMessageClass().
+ equals(incomingMessageClass)) {
+ throw new IllegalArgumentException(
+ "Cannot change incoming message class from " +
+ incomingMessageClasses.getMessageClass() +
+ " previously, to " + incomingMessageClass);
+ }
}
+ /**
+ * Set outgoing message class.
+ * Should not be used together setOutgoingMessageClasses
+ * (throws exception if called with it),
+ * as it is unnecessary to do so.
+ *
+ * @param outgoingMessageClass outgoing message class
+ */
public void setOutgoingMessageClass(
Class<? extends Writable> outgoingMessageClass) {
- this.outgoingMessageClass = outgoingMessageClass;
+ Preconditions.checkState(
+ outgoingMessageClasses instanceof DefaultMessageClasses);
+ ((DefaultMessageClasses) outgoingMessageClasses).
+ setMessageClass(outgoingMessageClass);
+ }
+
+ /**
+ * Get message combiner class
+ * @return message combiner class
+ */
+ public Class<? extends MessageCombiner> getMessageCombinerClass() {
+ MessageCombiner combiner =
+ outgoingMessageClasses.createMessageCombiner(conf);
+ return combiner != null ? combiner.getClass() : null;
}
/**
* Verify that types of current Computation and MessageCombiner are valid.
* If types don't match an {@link IllegalStateException} will be thrown.
*
- * @param conf Configuration to verify this with
* @param checkMatchingMesssageTypes Check that the incoming/outgoing
* message types match
*/
- public void verifyTypesMatch(ImmutableClassesGiraphConfiguration conf,
- boolean checkMatchingMesssageTypes) {
+ public void verifyTypesMatch(boolean checkMatchingMesssageTypes) {
// In some cases, for example when using Jython, the Computation class may
// not be set. This is because it is created by a ComputationFactory
// dynamically and not known ahead of time. In this case there is nothing to
@@ -160,91 +219,55 @@ public class SuperstepClasses implements Writable {
Class<?>[] computationTypes = ReflectionUtils.getTypeArguments(
TypesHolder.class, computationClass);
- verifyTypes(conf.getVertexIdClass(), computationTypes[0],
+ ReflectionUtils.verifyTypes(conf.getVertexIdClass(), computationTypes[0],
"Vertex id", computationClass);
- verifyTypes(conf.getVertexValueClass(), computationTypes[1],
+ ReflectionUtils.verifyTypes(conf.getVertexValueClass(), computationTypes[1],
"Vertex value", computationClass);
- verifyTypes(conf.getEdgeValueClass(), computationTypes[2],
+ ReflectionUtils.verifyTypes(conf.getEdgeValueClass(), computationTypes[2],
"Edge value", computationClass);
- Class<?> incomingMessageType = getIncomingMessageClass();
- Class<?> outgoingMessageType = getOutgoingMessageClass();
-
if (checkMatchingMesssageTypes) {
- verifyTypes(incomingMessageType, conf.getOutgoingMessageValueClass(),
- "New incoming and previous outgoing message", computationClass);
- }
- if (outgoingMessageType.isInterface()) {
- throw new IllegalStateException("verifyTypesMatch: " +
- "Message type must be concrete class " + outgoingMessageType);
- }
- if (Modifier.isAbstract(outgoingMessageType.getModifiers())) {
- throw new IllegalStateException("verifyTypesMatch: " +
- "Message type can't be abstract class" + outgoingMessageType);
- }
- if (messageCombinerClass != null) {
- Class<?>[] combinerTypes = ReflectionUtils.getTypeArguments(
- MessageCombiner.class, messageCombinerClass);
- verifyTypes(conf.getVertexIdClass(), combinerTypes[0],
- "Vertex id", messageCombinerClass);
- verifyTypes(outgoingMessageType, combinerTypes[1],
- "Outgoing message", messageCombinerClass);
+ ReflectionUtils.verifyTypes(incomingMessageClasses.getMessageClass(),
+ computationTypes[3], "Incoming message type", computationClass);
}
+
+ ReflectionUtils.verifyTypes(outgoingMessageClasses.getMessageClass(),
+ computationTypes[4], "Outgoing message type", computationClass);
+
+ outgoingMessageClasses.verifyConsistent(conf);
}
/**
- * Verify that found type matches the expected type. If types don't match an
- * {@link IllegalStateException} will be thrown.
- *
- * @param expected Expected type
- * @param actual Actual type
- * @param typeDesc String description of the type (for exception description)
- * @param mainClass Class in which the actual type was found (for exception
- * description)
+ * Update GiraphClasses with updated types
+ * @param classes Giraph classes
*/
- private void verifyTypes(Class<?> expected, Class<?> actual,
- String typeDesc, Class<?> mainClass) {
- if (!expected.equals(actual)) {
- if (actual.isAssignableFrom(expected)) {
- LOG.warn("verifyTypes: proceeding with assignable types : " +
- typeDesc + " types, in " + mainClass.getName() + " " + expected +
- " expected, but " + actual + " found");
- } else {
- throw new IllegalStateException("verifyTypes: " + typeDesc +
- " types " + "don't match, in " + mainClass.getName() + " " +
- expected + " expected, but " + actual + " found");
- }
- }
+ public void updateGiraphClasses(GiraphClasses classes) {
+ classes.setComputationClass(computationClass);
+ classes.setIncomingMessageClasses(incomingMessageClasses);
+ classes.setOutgoingMessageClasses(outgoingMessageClasses);
}
@Override
public void write(DataOutput output) throws IOException {
WritableUtils.writeClass(computationClass, output);
- WritableUtils.writeClass(messageCombinerClass, output);
- WritableUtils.writeClass(incomingMessageClass, output);
- WritableUtils.writeClass(outgoingMessageClass, output);
+ WritableUtils.writeWritableObject(incomingMessageClasses, output);
+ WritableUtils.writeWritableObject(outgoingMessageClasses, output);
}
@Override
public void readFields(DataInput input) throws IOException {
computationClass = WritableUtils.readClass(input);
- messageCombinerClass = WritableUtils.readClass(input);
- incomingMessageClass = WritableUtils.readClass(input);
- outgoingMessageClass = WritableUtils.readClass(input);
+ incomingMessageClasses = WritableUtils.readWritableObject(input, conf);
+ outgoingMessageClasses = WritableUtils.readWritableObject(input, conf);
}
@Override
public String toString() {
String computationName = computationClass == null ? "_not_set_" :
computationClass.getName();
- String combinerName = (messageCombinerClass == null) ? "null" :
- messageCombinerClass.getName();
- String incomingName = (incomingMessageClass == null) ? "null" :
- incomingMessageClass.getName();
- String outgoingName = (outgoingMessageClass == null) ? "null" :
- outgoingMessageClass.getName();
-
- return "(computation=" + computationName + ",combiner=" + combinerName +
- ",incoming=" + incomingName + ",outgoing=" + outgoingName + ")";
+ return "(computation=" + computationName +
+ ",incoming=" + incomingMessageClasses +
+ ",outgoing=" + outgoingMessageClasses + ")";
}
+
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/types/NoMessage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/NoMessage.java b/giraph-core/src/main/java/org/apache/giraph/types/NoMessage.java
new file mode 100644
index 0000000..0b9daf7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/types/NoMessage.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Type marking that in a particular superstep there will not be
+ * sent messages.
+ * We cannot use NullWritable for this, as you could send NullWritable,
+ * to send a signal (whether a vertex receives a message or not)
+ */
+public class NoMessage implements Writable {
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ throw new IllegalStateException("NoMessage should never be read");
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ throw new IllegalStateException("NoMessage should never be written");
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
index daad860..1544984 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
@@ -18,14 +18,14 @@
package org.apache.giraph.utils;
-import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
/**
* Stores vertex id and message pairs in a single byte array.
*
@@ -37,7 +37,7 @@ public class ByteArrayVertexIdMessages<I extends WritableComparable,
M extends Writable> extends ByteArrayVertexIdData<I, M>
implements VertexIdMessages<I, M> {
/** Message value class */
- private MessageValueFactory<M> messageValueFactory;
+ private final MessageValueFactory<M> messageValueFactory;
/** Add the message size to the stream? (Depends on the message store) */
private boolean useMessageSizeEncoding = false;
@@ -57,7 +57,7 @@ public class ByteArrayVertexIdMessages<I extends WritableComparable,
* de-serialized right away, so this won't help.
*/
private void setUseMessageSizeEncoding() {
- if (!getConf().useMessageCombiner()) {
+ if (!getConf().useOutgoingMessageCombiner()) {
useMessageSizeEncoding = getConf().useMessageSizeEncoding();
} else {
useMessageSizeEncoding = false;
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
index f5f0fb9..028f9e0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
@@ -18,6 +18,8 @@
package org.apache.giraph.utils;
+import java.lang.reflect.Modifier;
+
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.jodah.typetools.TypeResolver;
@@ -111,4 +113,48 @@ public class ReflectionUtils {
ConfigurationUtils.configureIfPossible(result, configuration);
return result;
}
+
+ /**
+ * Verify that found type matches the expected type. If types don't match an
+ * {@link IllegalStateException} will be thrown.
+ *
+ * @param concreteChild Concrete child type
+ * @param parent Parent type
+ * @param typeDesc String description of the type (for exception description)
+ * @param mainClass Class in which the actual type was found (for exception
+ * description)
+ */
+ public static void verifyTypes(Class<?> concreteChild, Class<?> parent,
+ String typeDesc, Class<?> mainClass) {
+ // unknown means object
+ if (parent == TypeResolver.Unknown.class) {
+ parent = Object.class;
+ }
+
+ verifyConcrete(concreteChild, typeDesc);
+
+ if (!parent.isAssignableFrom(concreteChild)) {
+ throw new IllegalStateException("verifyTypes: " + typeDesc + " types " +
+ "don't match, in " + mainClass.getName() + " " + concreteChild +
+ " expected, but " + parent + " found");
+ }
+ }
+
+ /**
+ * Verify that given type is a concrete type that can be instantiated.
+ *
+ * @param concrete type to check
+ * @param typeDesc String description of the type (for exception description)
+ */
+ public static void verifyConcrete(
+ Class<?> concrete, String typeDesc) {
+ if (concrete.isInterface()) {
+ throw new IllegalStateException("verifyTypes: " +
+ "Type " + typeDesc + " must be concrete class " + concrete);
+ }
+ if (Modifier.isAbstract(concrete.getModifiers())) {
+ throw new IllegalStateException("verifyTypes: " +
+ "Type " + typeDesc + "can't be abstract class" + concrete);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index f614a33..2f1c2ef 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -931,7 +931,8 @@ public class BspServiceWorker<I extends WritableComparable,
waitForOtherWorkers(superstepFinishedNode);
GlobalStats globalStats = new GlobalStats();
- SuperstepClasses superstepClasses = new SuperstepClasses();
+ SuperstepClasses superstepClasses = SuperstepClasses.createToRead(
+ getConfiguration());
WritableUtils.readFieldsFromZnode(
getZkExt(), superstepFinishedNode, false, null, globalStats,
superstepClasses);
@@ -1659,7 +1660,8 @@ public class BspServiceWorker<I extends WritableComparable,
// Load global stats and superstep classes
GlobalStats globalStats = new GlobalStats();
- SuperstepClasses superstepClasses = new SuperstepClasses();
+ SuperstepClasses superstepClasses = SuperstepClasses.createToRead(
+ getConfiguration());
String finalizedCheckpointPath = getSavedCheckpointBasePath(superstep) +
CheckpointingUtils.CHECKPOINT_FINALIZED_POSTFIX;
DataInputStream finalizedStream =
@@ -1717,7 +1719,8 @@ else[HADOOP_NON_SECURE]*/
Collections.shuffle(randomEntryList);
WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor =
new NettyWorkerClientRequestProcessor<I, V, E>(getContext(),
- getConfiguration(), this);
+ getConfiguration(), this,
+ false /* useOneMessageToManyIdsEncoding */);
for (Entry<WorkerInfo, List<Integer>> workerPartitionList :
randomEntryList) {
for (Integer partitionId : workerPartitionList.getValue()) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
index 10b1a25..7b2fc0f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
@@ -18,6 +18,11 @@
package org.apache.giraph.worker;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.concurrent.Callable;
+
import org.apache.giraph.comm.WorkerClientRequestProcessor;
import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -43,11 +48,6 @@ import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.Meter;
import com.yammer.metrics.util.PercentGauge;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.concurrent.Callable;
-
/**
* Abstract base class for loading vertex/edge input splits.
* Every thread will has its own instance of WorkerClientRequestProcessor
@@ -103,7 +103,8 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
this.context = context;
this.workerClientRequestProcessor =
new NettyWorkerClientRequestProcessor<I, V, E>(
- context, configuration, bspServiceWorker);
+ context, configuration, bspServiceWorker,
+ false /* useOneMessageToManyIdsEncoding, not useful for input */);
this.useLocality = configuration.useInputSplitLocality();
this.splitsHandler = splitsHandler;
this.configuration = configuration;
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java
index eb2497b..bf20580 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java
@@ -18,21 +18,38 @@
package org.apache.giraph.comm;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.io.Files;
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
import org.apache.commons.io.FileUtils;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.messages.ByteArrayMessagesPerVertexStore;
-import org.apache.giraph.comm.messages.out_of_core.DiskBackedMessageStore;
+import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
import org.apache.giraph.comm.messages.MessageStore;
import org.apache.giraph.comm.messages.MessageStoreFactory;
+import org.apache.giraph.comm.messages.out_of_core.DiskBackedMessageStore;
import org.apache.giraph.comm.messages.out_of_core.PartitionDiskBackedMessageStore;
import org.apache.giraph.comm.messages.out_of_core.SequentialFileMessageStore;
+import org.apache.giraph.conf.DefaultMessageClasses;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.factories.DefaultMessageValueFactory;
import org.apache.giraph.factories.TestMessageValueFactory;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.CollectionUtils;
@@ -46,23 +63,10 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-import static org.junit.Assert.assertTrue;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
/** Test for different types of message stores */
public class TestMessageStores {
@@ -220,7 +224,12 @@ public class TestMessageStores {
}
out.close();
- messageStore = messageStoreFactory.newStore(new TestMessageValueFactory<IntWritable>(IntWritable.class));
+ messageStore = messageStoreFactory.newStore(
+ new DefaultMessageClasses(
+ IntWritable.class,
+ DefaultMessageValueFactory.class,
+ null,
+ MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION));
DataInputStream in = new DataInputStream(new BufferedInputStream(
(new FileInputStream(file))));
@@ -240,7 +249,12 @@ public class TestMessageStores {
TestData testData) throws IOException {
SortedMap<IntWritable, Collection<IntWritable>> messages =
new TreeMap<IntWritable, Collection<IntWritable>>();
- S messageStore = messageStoreFactory.newStore(new TestMessageValueFactory<IntWritable>(IntWritable.class));
+ S messageStore = messageStoreFactory.newStore(
+ new DefaultMessageClasses(
+ IntWritable.class,
+ DefaultMessageValueFactory.class,
+ null,
+ MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION));
putNTimes(messageStore, messages, testData);
assertTrue(equalMessages(messageStore, messages, testData));
messageStore.clearAll();
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/test/java/org/apache/giraph/conf/TestObjectCreation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/conf/TestObjectCreation.java b/giraph-core/src/test/java/org/apache/giraph/conf/TestObjectCreation.java
index 4e38bff..57529f4 100644
--- a/giraph-core/src/test/java/org/apache/giraph/conf/TestObjectCreation.java
+++ b/giraph-core/src/test/java/org/apache/giraph/conf/TestObjectCreation.java
@@ -18,6 +18,11 @@
package org.apache.giraph.conf;
+import static org.junit.Assert.assertEquals;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
import org.apache.giraph.time.SystemTime;
import org.apache.giraph.time.Time;
import org.apache.giraph.time.Times;
@@ -31,12 +36,6 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-
-import static org.junit.Assert.assertEquals;
-
/**
* Benchmark tests to insure that object creation via
* {@link ImmutableClassesGiraphConfiguration} is fast
@@ -49,7 +48,7 @@ public class TestObjectCreation {
private long startNanos = -1;
private long totalNanos = -1;
private long total = 0;
- private long expected = COUNT * (COUNT - 1) / 2L;
+ private final long expected = COUNT * (COUNT - 1) / 2L;
private ImmutableClassesGiraphConfiguration<LongWritable, LongWritable,
LongWritable> configuration;
@@ -59,7 +58,6 @@ public class TestObjectCreation {
GiraphConstants.VERTEX_ID_CLASS.set(conf, IntWritable.class);
GiraphConstants.VERTEX_VALUE_CLASS.set(conf, LongWritable.class);
GiraphConstants.EDGE_VALUE_CLASS.set(conf, DoubleWritable.class);
- GiraphConstants.INCOMING_MESSAGE_VALUE_CLASS.set(conf, LongWritable.class);
GiraphConstants.OUTGOING_MESSAGE_VALUE_CLASS.set(conf, LongWritable.class);
conf.setComputationClass(LongNoOpComputation.class);
configuration =
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/test/java/org/apache/giraph/io/TestVertexEdgeInput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestVertexEdgeInput.java b/giraph-core/src/test/java/org/apache/giraph/io/TestVertexEdgeInput.java
index 721a74c..272666c 100644
--- a/giraph-core/src/test/java/org/apache/giraph/io/TestVertexEdgeInput.java
+++ b/giraph-core/src/test/java/org/apache/giraph/io/TestVertexEdgeInput.java
@@ -18,10 +18,15 @@
package org.apache.giraph.io;
-import com.google.common.collect.Maps;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Map;
+
import org.apache.giraph.BspCase;
import org.apache.giraph.conf.GiraphConfiguration;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.ByteArrayEdges;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.factories.VertexValueFactory;
@@ -39,10 +44,7 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.junit.Test;
-import java.io.IOException;
-import java.util.Map;
-
-import static org.junit.Assert.*;
+import com.google.common.collect.Maps;
/**
* A test case to ensure that loading a graph from vertices and edges works as
@@ -349,14 +351,6 @@ public class TestVertexEdgeInput extends BspCase {
public static class TestVertexValueFactory
implements VertexValueFactory<IntWritable> {
@Override
- public void initialize(ImmutableClassesGiraphConfiguration conf) { }
-
- @Override
- public Class<IntWritable> getValueClass() {
- return IntWritable.class;
- }
-
- @Override
public IntWritable newInstance() {
return new IntWritable(3);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java b/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java
index 4a8caaa..194bb5e 100644
--- a/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java
+++ b/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java
@@ -18,6 +18,8 @@
package org.apache.giraph.master;
+import java.io.IOException;
+
import org.apache.giraph.combiner.MessageCombiner;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -32,74 +34,77 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.junit.Test;
-import java.io.IOException;
-
/** Test type verification when switching computation and combiner types */
public class TestComputationCombinerTypes {
+ private void testConsecutiveComp(
+ Class<? extends Computation> firstComputationClass,
+ Class<? extends Computation> secondComputationClass) {
+ testConsecutiveComp(firstComputationClass, secondComputationClass, null);
+ }
+
+ private void testConsecutiveComp(
+ Class<? extends Computation> firstComputationClass,
+ Class<? extends Computation> secondComputationClass,
+ Class<? extends MessageCombiner> messageCombinerClass) {
+ ImmutableClassesGiraphConfiguration conf =
+ createConfiguration(firstComputationClass);
+ SuperstepClasses classes = SuperstepClasses.createAndExtractTypes(conf);
+ classes.setComputationClass(secondComputationClass);
+ classes.setMessageCombinerClass(messageCombinerClass);
+ classes.verifyTypesMatch(true);
+ }
+
@Test
public void testAllMatchWithoutCombiner() {
- SuperstepClasses classes =
- new SuperstepClasses(IntNoOpComputation.class, null);
- classes.verifyTypesMatch(createConfiguration(IntNoOpComputation.class), true);
+ testConsecutiveComp(IntNoOpComputation.class, IntNoOpComputation.class);
}
@Test
public void testAllMatchWithCombiner() {
- SuperstepClasses classes =
- new SuperstepClasses(IntIntIntLongDoubleComputation.class,
- IntDoubleMessageCombiner.class);
- classes.verifyTypesMatch(
- createConfiguration(IntIntIntIntLongComputation.class), true);
+ testConsecutiveComp(
+ IntIntIntIntLongComputation.class,
+ IntIntIntLongDoubleComputation.class,
+ IntDoubleMessageCombiner.class);
}
@Test(expected = IllegalStateException.class)
public void testDifferentIdTypes() {
- SuperstepClasses classes =
- new SuperstepClasses(LongIntIntLongIntComputation.class, null);
- classes.verifyTypesMatch(
- createConfiguration(IntIntIntIntLongComputation.class), true);
+ testConsecutiveComp(
+ IntIntIntIntLongComputation.class, LongIntIntLongIntComputation.class);
}
@Test(expected = IllegalStateException.class)
public void testDifferentVertexValueTypes() {
- SuperstepClasses classes =
- new SuperstepClasses(IntLongIntLongIntComputation.class, null);
- classes.verifyTypesMatch(
- createConfiguration(IntIntIntIntLongComputation.class), true);
+ testConsecutiveComp(
+ IntIntIntIntLongComputation.class, IntLongIntLongIntComputation.class);
}
@Test(expected = IllegalStateException.class)
public void testDifferentEdgeDataTypes() {
- SuperstepClasses classes =
- new SuperstepClasses(IntIntLongLongIntComputation.class, null);
- classes.verifyTypesMatch(
- createConfiguration(IntIntIntIntLongComputation.class), true);
+ testConsecutiveComp(
+ IntIntIntIntLongComputation.class, IntIntLongLongIntComputation.class);
}
@Test(expected = IllegalStateException.class)
public void testDifferentMessageTypes() {
- SuperstepClasses classes =
- new SuperstepClasses(IntIntIntIntLongComputation.class, null);
- classes.verifyTypesMatch(
- createConfiguration(IntIntIntLongDoubleComputation.class), true);
+ testConsecutiveComp(
+ IntIntIntLongDoubleComputation.class, IntIntIntIntLongComputation.class);
}
@Test(expected = IllegalStateException.class)
public void testDifferentCombinerIdType() {
- SuperstepClasses classes =
- new SuperstepClasses(IntIntIntLongDoubleComputation.class,
- DoubleDoubleMessageCombiner.class);
- classes.verifyTypesMatch(
- createConfiguration(IntIntIntIntLongComputation.class), true);
+ testConsecutiveComp(
+ IntIntIntIntLongComputation.class,
+ IntIntIntLongDoubleComputation.class,
+ DoubleDoubleMessageCombiner.class);
}
@Test(expected = IllegalStateException.class)
public void testDifferentCombinerMessageType() {
- SuperstepClasses classes =
- new SuperstepClasses(IntIntIntLongDoubleComputation.class,
- IntLongMessageCombiner.class);
- classes.verifyTypesMatch(
- createConfiguration(IntIntIntIntLongComputation.class), true);
+ testConsecutiveComp(
+ IntIntIntIntLongComputation.class,
+ IntIntIntLongDoubleComputation.class,
+ IntLongMessageCombiner.class);
}
private static ImmutableClassesGiraphConfiguration createConfiguration(
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java b/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
index 833061e..8a034c2 100644
--- a/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
+++ b/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
@@ -140,9 +140,6 @@ public class TestSwitchClasses {
break;
case 4:
setComputation(Computation1.class);
- // message types removed
- setIncomingMessage(null);
- setOutgoingMessage(null);
break;
default:
haltComputation();