You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ed...@apache.org on 2014/12/08 20:21:39 UTC
[4/9] git commit: updated refs/heads/trunk to 8675c84
http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/mock/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/mock/package-info.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/mock/package-info.java
new file mode 100644
index 0000000..27819a9
--- /dev/null
+++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/mock/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Classes for generating test case code.
+ */
+package org.apache.giraph.debugger.mock;
http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/package-info.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/package-info.java
new file mode 100644
index 0000000..d864986
--- /dev/null
+++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Giraph Debugger, named Graft.
+ * @see {@link https://github.com/semihsalihoglu/graft}
+ */
+package org.apache.giraph.debugger;
http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/AggregatedValueWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/AggregatedValueWrapper.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/AggregatedValueWrapper.java
new file mode 100644
index 0000000..1391f44
--- /dev/null
+++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/AggregatedValueWrapper.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.debugger.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.giraph.debugger.GiraphAggregator.AggregatedValue;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.GeneratedMessage;
+
+/**
+ * Wrapper class around
+ * {@link org.apache.giraph.debugger.GiraphAggregator.AggregatedValue} protocol
+ * buffer.
+ *
+ * author: semihsalihoglu
+ */
+public class AggregatedValueWrapper extends BaseWrapper {
+ /**
+ * Key of the aggregator.
+ */
+ private String key;
+ /**
+ * Value of the aggregator.
+ */
+ private Writable value;
+
+ /**
+ * Public constructor, initializing an empty aggregator. Intended to be used
+ * when reading an aggregator from a protobuf.
+ */
+ public AggregatedValueWrapper() { }
+
+ /**
+ * Constructor. Intended to be used by Graft when it's intercepting
+ * computations during debugging.
+ * @param key key of the aggregator.
+ * @param value value of the aggregator.
+ */
+ public AggregatedValueWrapper(String key, Writable value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public GeneratedMessage buildProtoObject() {
+ AggregatedValue.Builder aggregatedValueBuilder = AggregatedValue
+ .newBuilder();
+ aggregatedValueBuilder.setWritableClass(value.getClass().getName());
+ aggregatedValueBuilder.setKey(key);
+ aggregatedValueBuilder.setValue(ByteString.copyFrom(WritableUtils
+ .writeToByteArray(value)));
+ return aggregatedValueBuilder.build();
+ }
+
+ @Override
+ public GeneratedMessage parseProtoFromInputStream(InputStream inputStream)
+ throws IOException {
+ return AggregatedValue.parseFrom(inputStream);
+ }
+
+ @Override
+ public void loadFromProto(GeneratedMessage protoObject)
+ throws ClassNotFoundException, IOException, InstantiationException,
+ IllegalAccessException {
+ AggregatedValue aggregatedValueProto = (AggregatedValue) protoObject;
+ this.value = (Writable) Class.forName(
+ aggregatedValueProto.getWritableClass()).newInstance();
+ WritableUtils.readFieldsFromByteArray(aggregatedValueProto.getValue()
+ .toByteArray(), this.value);
+ this.key = aggregatedValueProto.getKey();
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public Writable getValue() {
+ return value;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append("\nkey: " + key);
+ stringBuilder
+ .append(" aggregatedValueClass: " + value.getClass().getName());
+ stringBuilder.append(" value: " + value);
+ return stringBuilder.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/AggregatorWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/AggregatorWrapper.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/AggregatorWrapper.java
new file mode 100644
index 0000000..c7a5cd2
--- /dev/null
+++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/AggregatorWrapper.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.debugger.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.debugger.GiraphAggregator.AggregatedValue;
+import org.apache.giraph.debugger.GiraphAggregator.Aggregator.Builder;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+
+import com.google.protobuf.GeneratedMessage;
+
+/**
+ * Wrapper class around
+ * {@link org.apache.giraph.debugger.GiraphAggregator.Aggregator} protocol
+ * buffer.
+ *
+ * author: semihsalihoglu
+ */
+@SuppressWarnings("rawtypes")
+public class AggregatorWrapper extends BaseWrapper {
+
+ /**
+ * Key of the aggregator.
+ */
+ private String key;
+ /**
+ * The aggregator object.
+ */
+ private final Aggregator<Writable> aggregator;
+
+ /**
+ * Constructor.
+ * @param key key of the aggregator.
+ * @param aggregator the aggregator object.
+ */
+ @SuppressWarnings("unchecked")
+ public AggregatorWrapper(String key, Aggregator aggregator) {
+ this.key = key;
+ this.aggregator = aggregator;
+ }
+
+ @Override
+ public GeneratedMessage buildProtoObject() {
+ Builder aggregatorProtoBuilder =
+ org.apache.giraph.debugger.GiraphAggregator.Aggregator.newBuilder();
+ aggregatorProtoBuilder.setAggregatorClass(aggregator.getClass().getName());
+ aggregatorProtoBuilder
+ .setAggregatedValue((AggregatedValue) new AggregatedValueWrapper(key,
+ aggregator.getAggregatedValue()).buildProtoObject());
+ return aggregatorProtoBuilder.build();
+ }
+
+ @Override
+ public GeneratedMessage parseProtoFromInputStream(InputStream inputStream)
+ throws IOException {
+ return org.apache.giraph.debugger.GiraphAggregator.Aggregator
+ .parseFrom(inputStream);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void loadFromProto(GeneratedMessage protoObject)
+ throws ClassNotFoundException, IOException, InstantiationException,
+ IllegalAccessException {
+ org.apache.giraph.debugger.GiraphAggregator.Aggregator aggregatorProto =
+ (org.apache.giraph.debugger.GiraphAggregator.Aggregator) protoObject;
+ Aggregator<Writable> giraphAggregator =
+ (org.apache.giraph.aggregators.Aggregator<Writable>) Class
+ .forName(aggregatorProto.getAggregatorClass()).newInstance();
+ AggregatedValue aggregatedValueProto = aggregatorProto.getAggregatedValue();
+ this.key = aggregatedValueProto.getKey();
+ Writable giraphAggregatedValue = (Writable) Class.forName(
+ aggregatedValueProto.getWritableClass()).newInstance();
+ WritableUtils.readFieldsFromByteArray(aggregatedValueProto.getValue()
+ .toByteArray(), giraphAggregatedValue);
+ giraphAggregator.setAggregatedValue(giraphAggregatedValue);
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public Aggregator<Writable> getAggregator() {
+ return aggregator;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append("\nkey: " + key);
+ stringBuilder
+ .append(" aggregatorClass: " + aggregator.getClass().getName());
+ stringBuilder.append(" aggregatedValueClass: " +
+ aggregator.getAggregatedValue().getClass().getName());
+ stringBuilder.append(" value: " + aggregator.getAggregatedValue());
+ return stringBuilder.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/AsyncHDFSWriteService.java
----------------------------------------------------------------------
diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/AsyncHDFSWriteService.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/AsyncHDFSWriteService.java
new file mode 100644
index 0000000..5a932b1
--- /dev/null
+++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/AsyncHDFSWriteService.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.debugger.utils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+import com.google.protobuf.GeneratedMessage;
+
+/**
+ * A utility class for writing to HDFS asynchronously.
+ */
+public class AsyncHDFSWriteService {
+
+ /**
+ * Logger for this class.
+ */
+ protected static final Logger LOG = Logger
+ .getLogger(AsyncHDFSWriteService.class);
+
+ /**
+ * The thread pool that will handle the synchronous writing, and hide the
+ * latency from the callers.
+ */
+ private static ExecutorService HDFS_ASYNC_WRITE_SERVICE = Executors
+ .newFixedThreadPool(2);
+ static {
+ // Make sure we finish writing everything before shuting down the VM.
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ LOG.info("Shutting down writer");
+ HDFS_ASYNC_WRITE_SERVICE.shutdown();
+ LOG.info("Waiting until finishes all writes");
+ try {
+ HDFS_ASYNC_WRITE_SERVICE.awaitTermination(Long.MAX_VALUE,
+ TimeUnit.NANOSECONDS);
+ LOG.info("Finished all writes");
+ } catch (InterruptedException e) {
+ LOG.error("Could not finish all writes");
+ e.printStackTrace();
+ }
+ }
+ }));
+ }
+
+ /**
+ * Not for instantiation.
+ */
+ private AsyncHDFSWriteService() {
+ }
+
+ /**
+ * Writes given protobuf message to the given filesystem path in the
+ * background.
+ *
+ * @param message
+ * The proto message to write.
+ * @param fs
+ * The HDFS filesystem to write to.
+ * @param fileName
+ * The HDFS path to write the message to.
+ */
+ public static void writeToHDFS(final GeneratedMessage message,
+ final FileSystem fs, final String fileName) {
+ HDFS_ASYNC_WRITE_SERVICE.submit(new Runnable() {
+ @Override
+ public void run() {
+ Path pt = new Path(fileName);
+ try {
+ LOG.info("Writing " + fileName + " at " + fs.getUri());
+ OutputStream wrappedStream = fs.create(pt, true).getWrappedStream();
+ message.writeTo(wrappedStream);
+ wrappedStream.close();
+ LOG.info("Done writing " + fileName);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/BaseScenarioAndIntegrityWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/BaseScenarioAndIntegrityWrapper.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/BaseScenarioAndIntegrityWrapper.java
new file mode 100644
index 0000000..7a736f5
--- /dev/null
+++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/BaseScenarioAndIntegrityWrapper.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.debugger.utils;
+
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Base wrapper class for {@link GiraphVertexScenarioWrapper},
+ * {@link MsgIntegrityViolationWrapper},
+ * {@link VertexValueIntegrityViolationWrapper}.
+ *
+ * author: semihsalihoglu
+ *
+ * @param <I>
+ * Vertex id
+ */
+@SuppressWarnings("rawtypes")
+public abstract class BaseScenarioAndIntegrityWrapper<
+ I extends WritableComparable> extends BaseWrapper {
+ /**
+ * Class of the type of the vertex IDs.
+ */
+ protected Class<I> vertexIdClass;
+
+ /**
+ * Default empty constructor.
+ */
+ protected BaseScenarioAndIntegrityWrapper() { };
+
+ /**
+ * Default constructor initializing the vertexIdClass.
+ * @param vertexIdClass vertex id class.
+ */
+ public BaseScenarioAndIntegrityWrapper(Class<I> vertexIdClass) {
+ initialize(vertexIdClass);
+ }
+
+ public Class<I> getVertexIdClass() {
+ return vertexIdClass;
+ }
+
+ /**
+ * Initializes vertex id class.
+ * @param vertexIdClass vertex id class.
+ */
+ public void initialize(Class<I> vertexIdClass) {
+ this.vertexIdClass = vertexIdClass;
+ }
+
+ @Override
+ public String toString() {
+ return "\nvertexIdClass: " + getVertexIdClass().getCanonicalName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/BaseWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/BaseWrapper.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/BaseWrapper.java
new file mode 100644
index 0000000..e0a6d51
--- /dev/null
+++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/BaseWrapper.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.debugger.utils;
+
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.GeneratedMessage;
+
+/**
+ * Base class for all wrapper classes that wrap a protobuf.
+ *
+ * author: semihsalihoglu
+ */
+public abstract class BaseWrapper {
+
+ /**
+ * @param <U> type of the upperBound class.
+ * @param clazz a {@link Class} object that will be cast.
+ * @param upperBound another {@link Class} object that clazz will be cast
+ * into.
+ * @return clazz cast to upperBound.
+ */
+ @SuppressWarnings("unchecked")
+ protected <U> Class<U> castClassToUpperBound(Class<?> clazz,
+ Class<U> upperBound) {
+ if (!upperBound.isAssignableFrom(clazz)) {
+ throw new IllegalArgumentException("The class " + clazz.getName() +
+ " is not a subclass of " + upperBound.getName());
+ }
+ return (Class<U>) clazz;
+ }
+
+ /**
+ * Utility method to read the contents of a {@link ByteString} to the given
+ * {@link Writable}.
+ * @param byteString a {@link ByteString} object.
+ * @param writable a {@link Writable} object.
+ */
+ void fromByteString(ByteString byteString, Writable writable) {
+ if (writable != null) {
+ WritableUtils.readFieldsFromByteArray(byteString.toByteArray(), writable);
+ }
+ }
+
+ /**
+ * @param writable a {@link Writable} object.
+ * @return the contents of writable as {@link ByteString}.
+ */
+ ByteString toByteString(Writable writable) {
+ return ByteString.copyFrom(WritableUtils.writeToByteArray(writable));
+ }
+
+ /**
+ * Saves this wrapper object to a file.
+ * @param fileName the full path of the file to save this wrapper object.
+ * @throws IOException thrown when there is an exception during the writing.
+ */
+ public void save(String fileName) throws IOException {
+ try (FileOutputStream output = new FileOutputStream(fileName)) {
+ buildProtoObject().writeTo(output);
+ output.close();
+ }
+ }
+
+
+ /**
+ * Saves this wrapper object to a file in HDFS.
+ * @param fs {@link FileSystem} to use for saving to HDFS.
+ * @param fileName the full path of the file to save this wrapper object.
+ * @throws IOException thrown when there is an exception during the writing.
+ */
+ public void saveToHDFS(FileSystem fs, String fileName) throws IOException {
+ AsyncHDFSWriteService.writeToHDFS(buildProtoObject(), fs, fileName);
+ }
+
+ /**
+ * @return the protobuf representing this wrapper object.
+ */
+ public abstract GeneratedMessage buildProtoObject();
+
+ /**
+ * Loads a protocol buffer stored in a file into this wrapper object.
+ * @param fileName the full path of the file where the protocol buffer is
+ * stored.
+ */
+ public void load(String fileName) throws ClassNotFoundException, IOException,
+ InstantiationException, IllegalAccessException {
+ try (FileInputStream inputStream = new FileInputStream(fileName)) {
+ loadFromProto(parseProtoFromInputStream(inputStream));
+ }
+ }
+
+ /**
+ * Loads a protocol buffer stored in a file in HDFS into this wrapper object.
+ * @param fs {@link FileSystem} to use for reading from HDFS.
+ * @param fileName the full path of the file where the protocol buffer is
+ * stored.
+ */
+ public void loadFromHDFS(FileSystem fs, String fileName)
+ throws ClassNotFoundException, IOException, InstantiationException,
+ IllegalAccessException {
+ try (FSDataInputStream inputStream = fs.open(new Path(fileName))) {
+ loadFromProto(parseProtoFromInputStream(inputStream));
+ }
+ }
+
+ /**
+ * Constructs a protobuf representing this wrapper object from an
+ * {@link InputStream}.
+ * @param inputStream {@link InputStream} containing the contents of this
+ * wrapper object.
+ * @return the protobuf version of this wrapper object.
+ */
+ public abstract GeneratedMessage parseProtoFromInputStream(
+ InputStream inputStream) throws IOException;
+
+ /**
+ * Constructs this wrapper object from a protobuf.
+ * @param protoObject protobuf to read when constructing this wrapper object.
+ */
+ public abstract void loadFromProto(GeneratedMessage protoObject)
+ throws ClassNotFoundException, IOException, InstantiationException,
+ IllegalAccessException;
+
+ /**
+ * Add given URLs to the CLASSPATH before loading from HDFS. To do so, we hack
+ * the system class loader, assuming it is an URLClassLoader.
+ *
+ * XXX Setting the currentThread's context class loader has no effect on
+ * Class#forName().
+ *
+ * @see http://stackoverflow.com/a/12963811/390044
+ * @param fs {@link FileSystem} to use for reading from HDFS.
+ * @param fileName the name of the file in HDFS.
+ * @param classPaths a possible list of class paths that may contain the
+ * directories containing the file.
+ */
+ public void loadFromHDFS(FileSystem fs, String fileName, URL... classPaths)
+ throws ClassNotFoundException, InstantiationException,
+ IllegalAccessException, IOException {
+ for (URL url : classPaths) {
+ addPath(url);
+ }
+ loadFromHDFS(fs, fileName);
+ }
+
+ /**
+ * @param u
+ * the URL to add to the CLASSPATH
+ * @see http://stackoverflow.com/a/252967/390044
+ */
+ private static void addPath(URL u) {
+ // need to do add path to Classpath with reflection since the
+ // URLClassLoader.addURL(URL url) method is protected:
+ ClassLoader cl = ClassLoader.getSystemClassLoader();
+ if (cl instanceof URLClassLoader) {
+ URLClassLoader urlClassLoader = (URLClassLoader) cl;
+ Class<URLClassLoader> urlClass = URLClassLoader.class;
+ try {
+ Method method = urlClass.getDeclaredMethod("addURL",
+ new Class[] { URL.class });
+ method.setAccessible(true);
+ method.invoke(urlClassLoader, u);
+ } catch (NoSuchMethodException | SecurityException |
+ IllegalAccessException | IllegalArgumentException |
+ InvocationTargetException e) {
+ throw new IllegalStateException("Cannot add URL to system ClassLoader",
+ e);
+ }
+ } else {
+ throw new IllegalStateException(
+ "Cannot add URL to system ClassLoader of type " +
+ cl.getClass().getSimpleName());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/CommonVertexMasterContextWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/CommonVertexMasterContextWrapper.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/CommonVertexMasterContextWrapper.java
new file mode 100644
index 0000000..e6a3858
--- /dev/null
+++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/CommonVertexMasterContextWrapper.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.debugger.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.debugger.GiraphAggregator.AggregatedValue;
+import org.apache.giraph.debugger.Scenario.CommonVertexMasterContext;
+
+import com.google.protobuf.GeneratedMessage;
+
+/**
+ * Wrapper class around
+ * {@link org.apache.giraph.debugger.Scenario.CommonVertexMasterContext}
+ * protocol buffer.
+ */
+@SuppressWarnings("rawtypes")
+public class CommonVertexMasterContextWrapper extends BaseWrapper {
+ /**
+ * Wraps the {@link ImmutableClassesGiraphConfiguration} which
+ * {@link org.apache.giraph.debugger.Scenario.CommonVertexMasterContext}
+ * exposes.
+ */
+ private ImmutableClassesGiraphConfiguration immutableClassesConfig = null;
+ /**
+ * Wraps the superstep number which
+ * {@link org.apache.giraph.debugger.Scenario.CommonVertexMasterContext}
+ * exposes.
+ */
+ private long superstepNo;
+ /**
+ * Wraps the totalNumVertices which
+ * {@link org.apache.giraph.debugger.Scenario.CommonVertexMasterContext}
+ * exposes.
+ */
+ private long totalNumVertices;
+ /**
+ * Wraps the totalNumEdges which
+ * {@link org.apache.giraph.debugger.Scenario.CommonVertexMasterContext}
+ * exposes.
+ */
+ private long totalNumEdges;
+ /**
+ * Wraps the aggregated values from the previous superstep which
+ * {@link org.apache.giraph.debugger.Scenario.CommonVertexMasterContext}
+ * exposes.
+ */
+ private List<AggregatedValueWrapper> previousAggregatedValueWrappers;
+
+ /**
+ * Default constructor. Initializes superstepNo, totalNumVertices, and
+ * totalNumEdges to -1. Initializes an empty aggregated values.
+ */
+ public CommonVertexMasterContextWrapper() {
+ this.superstepNo = -1;
+ this.totalNumVertices = -1;
+ this.totalNumEdges = -1;
+ this.previousAggregatedValueWrappers = new ArrayList<>();
+ }
+
+ /**
+ * Constructor with immutableClassesConfig, superstepNo, totalNumVertices,
+ * and totalNumEdges. Does not initialize previousAggregatedValueWrappers.
+ * @param immutableClassesConfig the
+ * {@link ImmutableClassesGiraphConfiguration} to initialize.
+ * @param superstepNo superstep number to initialize.
+ * @param totalNumVertices total number of vertices number to initialize.
+ * @param totalNumEdges total number of edges to initialize.
+ */
+ public CommonVertexMasterContextWrapper(
+ ImmutableClassesGiraphConfiguration immutableClassesConfig,
+ long superstepNo, long totalNumVertices, long totalNumEdges) {
+ this.immutableClassesConfig = immutableClassesConfig;
+ this.superstepNo = superstepNo;
+ this.totalNumVertices = totalNumVertices;
+ this.totalNumEdges = totalNumEdges;
+ }
+
+ public long getSuperstepNoWrapper() {
+ return superstepNo;
+ }
+
+ public long getTotalNumVerticesWrapper() {
+ return totalNumVertices;
+ }
+
+ public long getTotalNumEdgesWrapper() {
+ return totalNumEdges;
+ }
+
+ public void setSuperstepNoWrapper(long superstepNo) {
+ this.superstepNo = superstepNo;
+ }
+
+ public void setTotalNumVerticesWrapper(long totalNumVertices) {
+ this.totalNumVertices = totalNumVertices;
+ }
+
+ public void setTotalNumEdgesWrapper(long totalNumEdges) {
+ this.totalNumEdges = totalNumEdges;
+ }
+
+ /**
+ * Adds an aggregated value from the previous superstep.
+ * @param previousAggregatedValueWrapper an {@link AggregatedValueWrapper}
+ * object wrapping the aggregated value.
+ */
+ public void addPreviousAggregatedValue(
+ AggregatedValueWrapper previousAggregatedValueWrapper) {
+ this.previousAggregatedValueWrappers.add(previousAggregatedValueWrapper);
+ }
+
+ public void setPreviousAggregatedValues(
+ List<AggregatedValueWrapper> previousAggregatedValueWrappers) {
+ this.previousAggregatedValueWrappers = previousAggregatedValueWrappers;
+ }
+
+ public Collection<AggregatedValueWrapper> getPreviousAggregatedValues() {
+ return previousAggregatedValueWrappers;
+ }
+
+ public ImmutableClassesGiraphConfiguration getConfig() {
+ return immutableClassesConfig;
+ }
+
+ public void setConfig(
+ ImmutableClassesGiraphConfiguration immutableClassesConfig) {
+ this.immutableClassesConfig = immutableClassesConfig;
+ }
+
+ @Override
+ public GeneratedMessage buildProtoObject() {
+ CommonVertexMasterContext.Builder commonContextBuilder =
+ CommonVertexMasterContext.newBuilder();
+ commonContextBuilder.setConf(toByteString(immutableClassesConfig))
+ .setSuperstepNo(getSuperstepNoWrapper())
+ .setTotalNumVertices(getTotalNumVerticesWrapper())
+ .setTotalNumEdges(getTotalNumEdgesWrapper());
+
+ for (AggregatedValueWrapper aggregatedValueWrapper :
+ getPreviousAggregatedValues()) {
+ commonContextBuilder
+ .addPreviousAggregatedValue((AggregatedValue) aggregatedValueWrapper
+ .buildProtoObject());
+ }
+ return commonContextBuilder.build();
+ }
+
+ @Override
+ public void loadFromProto(GeneratedMessage generatedMessage)
+ throws ClassNotFoundException, IOException, InstantiationException,
+ IllegalAccessException {
+ CommonVertexMasterContext commonContext = (CommonVertexMasterContext)
+ generatedMessage;
+ GiraphConfiguration config = new GiraphConfiguration();
+ fromByteString(commonContext.getConf(), config);
+ ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration =
+ new ImmutableClassesGiraphConfiguration(config);
+ setConfig(immutableClassesGiraphConfiguration);
+
+ setSuperstepNoWrapper(commonContext.getSuperstepNo());
+ setTotalNumVerticesWrapper(commonContext.getTotalNumVertices());
+ setTotalNumEdgesWrapper(commonContext.getTotalNumEdges());
+
+ for (AggregatedValue previousAggregatedValueProto : commonContext
+ .getPreviousAggregatedValueList()) {
+ AggregatedValueWrapper aggregatedValueWrapper =
+ new AggregatedValueWrapper();
+ aggregatedValueWrapper.loadFromProto(previousAggregatedValueProto);
+ addPreviousAggregatedValue(aggregatedValueWrapper);
+ }
+ }
+
+ @Override
+ public GeneratedMessage parseProtoFromInputStream(InputStream inputStream)
+ throws IOException {
+ return CommonVertexMasterContext.parseFrom(inputStream);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append("\nconfig: " + immutableClassesConfig.toString());
+ stringBuilder.append("superstepNo: " + getSuperstepNoWrapper());
+ stringBuilder.append("\ntotalNumVertices: " + totalNumVertices);
+ stringBuilder.append("\ntotalNumEdges: " + totalNumEdges);
+ stringBuilder.append("\nnumAggregators: " +
+ getPreviousAggregatedValues().size());
+ for (AggregatedValueWrapper aggregatedValueWrapper :
+ getPreviousAggregatedValues()) {
+ stringBuilder.append("\n" + aggregatedValueWrapper);
+ }
+ return stringBuilder.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/DebuggerUtils.java
----------------------------------------------------------------------
diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/DebuggerUtils.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/DebuggerUtils.java
new file mode 100644
index 0000000..03fdf68
--- /dev/null
+++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/DebuggerUtils.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.debugger.utils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Contains common utility classes shared one or more of:
+ * <ul>
+ * <li>Graft instrumenter and the
+ * <li>server that serves data to Graft GUI by talking to HDFS
+ * <li>Wrapper classes around the scenario protocol buffers that are stored
+ * under {@link org.apache.giraph.debugger.utils}.
+ * </ul>
+ *
+ * author semihsalihoglu
+ */
+public class DebuggerUtils {
+
+ /**
+ * The path to the HDFS root for storing Graft traces.
+ */
+ public static final String TRACE_ROOT = System.getProperty(
+ "giraph.debugger.traceRootAtHDFS",
+ "/user/" + System.getProperty("user.name") + "/giraph-debug-traces");
+ /**
+ * The path to the HDFS root for storing cached Giraph job jars.
+ */
+ public static final String JARCACHE_HDFS = System.getProperty(
+ "giraph.debugger.jobCacheAtHDFS", TRACE_ROOT + "/jars");
+ /**
+ * The path to the local root directory for storing cached Giraph job jars.
+ */
+ public static final String JARCACHE_LOCAL = System.getProperty(
+ "giraph.debugger.jobCacheLocal", System.getenv("HOME") +
+ "/.giraph-debug/jars");
+
+ /**
+ * Enumeration of different trace files Graft saves in HDFS.
+ */
+ public enum DebugTrace {
+ /**
+ * Regular trace capturing a vertex computation.
+ */
+ VERTEX_REGULAR("regular vertex"),
+ /**
+ * Captured exception from a vertex.
+ */
+ VERTEX_EXCEPTION("exception from a vertex"),
+ /**
+ * All traces of a particular vertex.
+ */
+ VERTEX_ALL,
+ /**
+ * Captured message integrity violations.
+ */
+ INTEGRITY_MESSAGE_ALL("invalid messages"),
+ /**
+ * Trace of the single message violating constraints.
+ */
+ INTEGRITY_MESSAGE_SINGLE_VERTEX("vertex sending invalid messages"),
+ /**
+ * Trace of the vertex computation that sends an invalid message.
+ */
+ INTEGRITY_VERTEX("vertex having invalid value"),
+ /**
+ * Regular trace of a MasterCompute.
+ */
+ MASTER_REGULAR("regular MasterCompute"),
+ /**
+ * Trace capturing exception thrown from a MasterCompute.
+ */
+ MASTER_EXCEPTION("exception from MasterCompute"),
+ /**
+ * All traces of MasterCompute.
+ */
+ MASTER_ALL,
+ /**
+ * The jar signature that links the instrumented jar.
+ */
+ JAR_SIGNATURE;
+
+ /**
+ * The label of this debug trace.
+ */
+ private final String label;
+
+ /**
+ * Creates a DebugTrace instance without a label.
+ */
+ private DebugTrace() {
+ this.label = null;
+ }
+
+ /**
+ * Creates a DebugTrace instance with a specific label.
+ * @param label The label.
+ */
+ private DebugTrace(String label) {
+ this.label = label;
+ }
+
+ /**
+ * Returns the label.
+ * @return the label
+ */
+ public String getLabel() {
+ return label;
+ }
+ }
+
+ /**
+ * File name prefix for regular traces.
+ */
+ public static final String PREFIX_TRACE_REGULAR = "reg";
+ /**
+ * File name prefix for exception traces.
+ */
+ public static final String PREFIX_TRACE_EXCEPTION = "err";
+ /**
+ * File name prefix for vertex value integrity traces.
+ */
+ public static final String PREFIX_TRACE_VERTEX = "vv";
+ /**
+ * File name prefix for message integrity traces.
+ */
+ public static final String PREFIX_TRACE_MESSAGE = "msg";
+
+ /**
+ * Disallows creating instances of this class.
+ */
+ private DebuggerUtils() { }
+
+ /**
+ * Makes a clone of a writable object. Giraph sometimes reuses and overwrites
+ * the bytes inside {@link Writable} objects. For example, when reading the
+ * incoming messages inside a {@link Computation} class through the iterator
+ * Giraph supplies, Giraph uses only one object. Therefore in order to keep a
+ * pointer to particular object, we need to clone it.
+ *
+ * @param <T>
+ * Type of the clazz.
+ * @param writableToClone
+ * Writable object to clone.
+ * @param clazz
+ * Class of writableToClone.
+ * @return a clone of writableToClone.
+ */
+ public static <T extends Writable> T makeCloneOf(T writableToClone,
+ Class<T> clazz) {
+ T idCopy = newInstance(clazz);
+ // Return value is null if clazz is assignable to NullWritable.
+ if (idCopy == null) {
+ return writableToClone;
+ }
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new DataOutputStream(
+ byteArrayOutputStream);
+ try {
+ writableToClone.write(dataOutputStream);
+ } catch (IOException e) {
+ // Throwing a runtime exception because the methods that call other
+ // methods
+ // such as addNeighborWrapper or addOutgoingMessageWrapper, implement
+ // abstract classes
+ // or interfaces of Giraph that we can't edit to include a throws
+ // statement.
+ throw new RuntimeException(e);
+ }
+ //
+ if (byteArrayOutputStream.toByteArray() != null) {
+ WritableUtils.readFieldsFromByteArray(
+ byteArrayOutputStream.toByteArray(), idCopy);
+ byteArrayOutputStream.reset();
+ }
+ return idCopy;
+ }
+
+ /**
+ * Instantiates a new object from the given class.
+ *
+ * @param <T> The type of the new instance to create.
+ * @param theClass The class of the new instance to create.
+ * @return The newly created instance.
+ */
+ public static <T> T newInstance(Class<T> theClass) {
+ return NullWritable.class.isAssignableFrom(theClass) ? null :
+ ReflectionUtils.newInstance(theClass);
+ }
+
+ /**
+ * Returns the full trace file name for the given type of debug trace. One or
+ * more of the passed arguments will be used in the file name.
+ *
+ * @param debugTrace The debug trace for generating the file name.
+ * @param jobId The job id of the job the debug trace belongs to.
+ * @param superstepNo The superstep number of the debug trace.
+ * @param vertexId The vertex id of the debug trace.
+ * @param taskId The task id of the debug trace.
+ * @return The full trace file name.
+ */
+ public static String getFullTraceFileName(DebugTrace debugTrace,
+ String jobId, Long superstepNo, String vertexId, String taskId) {
+ return getTraceFileRoot(jobId) + "/" +
+ getTraceFileName(debugTrace, superstepNo, vertexId, taskId);
+ }
+
+ /**
+ * A convenience method around
+ * {@link #getFullTraceFileName(DebugTrace, String, Long, String, Integer)}.
+ *
+ * @param superstepNo The superstep number of the trace.
+ * @param jobId The job id of the trace.
+ * @param taskId The task id of the trace.
+ * @return The full trace file name for debug trace of message integrity.
+ */
+ public static String getMessageIntegrityAllTraceFullFileName(
+ long superstepNo, String jobId, String taskId) {
+ return getFullTraceFileName(DebugTrace.INTEGRITY_MESSAGE_ALL, jobId,
+ superstepNo, null /* no vertex Id */, taskId);
+ }
+
+ /**
+ * A convenience method around
+ * {@link #getFullTraceFileName(DebugTrace, String, Long, String, Integer)}.
+ *
+ * @param masterDebugTrace The debug trace for generating the file name.
+ * @param jobId The job id the debug trace belongs to.
+ * @param superstepNo The superstep number.
+ * @return The full trace file name of the master compute trace.
+ */
+ public static String getFullMasterTraceFileName(DebugTrace masterDebugTrace,
+ String jobId, Long superstepNo) {
+ return getFullTraceFileName(masterDebugTrace, jobId, superstepNo,
+ null /* no vertex Id */, null /* no trace Id */);
+ }
+
+ /**
+ * A convenience method around
+ * {@link #getFullTraceFileName(DebugTrace, String, Long, String, Integer)}.
+ *
+ * @param debugTrace The debug trace for generating the file name.
+ * @param jobId The job id the debug trace belongs to.
+ * @param superstepNo The superstep number.
+ * @param vertexId The vertex id of the debug trace.
+ * @return The full trace file name without the trace id.
+ */
+ public static String getFullTraceFileName(DebugTrace debugTrace,
+ String jobId, Long superstepNo, String vertexId) {
+ return getFullTraceFileName(debugTrace, jobId, superstepNo, vertexId,
+ null /* no trace Id */);
+ }
+
+ /**
+ * Maps debug trace to file names with additional parameters.
+ *
+ * @param debugTrace The debug trace.
+ * @param superstepNo The superstep number.
+ * @param vertexId The vertex id.
+ * @param taskId The task id.
+ * @return The file name that corresponds to the debug trace.
+ */
+ private static String getTraceFileName(DebugTrace debugTrace,
+ Long superstepNo, String vertexId, String taskId) {
+ String format = getTraceFileFormat(debugTrace);
+ switch (debugTrace) {
+ case VERTEX_REGULAR:
+ return String.format(format, superstepNo, vertexId);
+ case VERTEX_EXCEPTION:
+ return String.format(format, superstepNo, vertexId);
+ case INTEGRITY_MESSAGE_ALL:
+ return String.format(format, taskId, superstepNo);
+ case INTEGRITY_MESSAGE_SINGLE_VERTEX:
+ return String.format(format, superstepNo, vertexId);
+ case INTEGRITY_VERTEX:
+ return String.format(format, superstepNo, vertexId);
+ case MASTER_REGULAR:
+ return String.format(format, superstepNo);
+ case MASTER_EXCEPTION:
+ return String.format(format, superstepNo);
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Returns the file name of the trace file given the three parameters. Pass
+ * arbitrary vertexId for traces which do not require a vertexId.
+ *
+ * @param debugTrace
+ * The debug trace.
+ * @return The file name format for the debug trace to be used with
+ * {@link String#format(String, Object...)}.
+ */
+ public static String getTraceFileFormat(DebugTrace debugTrace) {
+ // XXX is this function giving the String format? or regex? Seems latter.
+ switch (debugTrace) {
+ case VERTEX_REGULAR:
+ return PREFIX_TRACE_REGULAR + "_stp_%s_vid_%s.tr";
+ case VERTEX_EXCEPTION:
+ return PREFIX_TRACE_EXCEPTION + "_stp_%s_vid_%s.tr";
+ case VERTEX_ALL:
+ return String.format("(%s|%s)%s", PREFIX_TRACE_REGULAR,
+ PREFIX_TRACE_EXCEPTION, "_stp_%s_vid_%s.tr");
+ case INTEGRITY_MESSAGE_ALL:
+ return "task_%s_msg_intgrty_stp_%s.tr";
+ case INTEGRITY_MESSAGE_SINGLE_VERTEX:
+ return PREFIX_TRACE_MESSAGE + "_intgrty_stp_%s_vid_%s.tr";
+ case INTEGRITY_VERTEX:
+ return PREFIX_TRACE_VERTEX + "_intgrty_stp_%s_vid_%s.tr";
+ case MASTER_REGULAR:
+ return "master_" + PREFIX_TRACE_REGULAR + "_stp_%s.tr";
+ case MASTER_EXCEPTION:
+ return "master_" + PREFIX_TRACE_EXCEPTION + "_stp_%s.tr";
+ case MASTER_ALL:
+ return String.format("master_(%s|%s)_%s", PREFIX_TRACE_REGULAR,
+ PREFIX_TRACE_EXCEPTION, "_stp_%s.tr");
+ default:
+ throw new IllegalArgumentException("DebugTrace not supported.");
+ }
+ }
+
+ /**
+ * Maps prefix back to the corresponding debug trace.
+ *
+ * @param prefix The file name prefix.
+ * @return The debug trace value that corresponds to given prefix.
+ * @throws IllegalArgumentException Thrown if prefix isn't supported.
+ */
+ public static DebugTrace getVertexDebugTraceForPrefix(String prefix) {
+ if (prefix.equals(PREFIX_TRACE_REGULAR)) {
+ return DebugTrace.VERTEX_REGULAR;
+ } else if (prefix.equals(PREFIX_TRACE_EXCEPTION)) {
+ return DebugTrace.VERTEX_EXCEPTION;
+ } else if (prefix.equals(PREFIX_TRACE_VERTEX)) {
+ return DebugTrace.INTEGRITY_VERTEX;
+ } else if (prefix.equals(PREFIX_TRACE_MESSAGE)) {
+ return DebugTrace.INTEGRITY_MESSAGE_SINGLE_VERTEX;
+ } else {
+ throw new IllegalArgumentException("Prefix not supported");
+ }
+ }
+
+ /**
+ * Returns the root directory of the trace files for the given job.
+ *
+ * @param jobId The job id of the job.
+ * @return The root path for storing traces for the job.
+ */
+ public static String getTraceFileRoot(String jobId) {
+ return String.format("%s/%s", DebuggerUtils.TRACE_ROOT, jobId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/ExceptionWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/ExceptionWrapper.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/ExceptionWrapper.java
new file mode 100644
index 0000000..cc0598e
--- /dev/null
+++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/ExceptionWrapper.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.debugger.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.giraph.debugger.Scenario.Exception;
+
+import com.google.protobuf.GeneratedMessage;
+
+/**
+ * Wrapper class around {@link org.apache.giraph.debugger.Scenario.Exception}
+ * protocol buffer.
+ *
+ * author semihsalihoglu
+ */
+public class ExceptionWrapper extends BaseWrapper {
+ /**
+ * The error message of the exception.
+ */
+ private String errorMessage = "";
+ /**
+ * The stack trace string of the exception.
+ */
+ private String stackTrace = "";
+
+ /**
+ * Default constructor.
+ */
+ public ExceptionWrapper() {
+ }
+
+ /**
+ * Constructor with an error message and stack trace.
+ *
+ * @param errorMessage
+ * The error message of the exception.
+ * @param stackTrace
+ * The stack trace string obtained from
+ * {@link java.lang.Exception#getStackTrace()}.
+ */
+ public ExceptionWrapper(String errorMessage, String stackTrace) {
+ this.errorMessage = errorMessage;
+ this.stackTrace = stackTrace;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append("errorMessage: " + getErrorMessage());
+ stringBuilder.append("\nstackTrace: " + getStackTrace());
+ return stringBuilder.toString();
+ }
+
+ public String getErrorMessage() {
+ // We append with "" to guard against null pointer exceptions
+ return "" + errorMessage;
+ }
+
+ public String getStackTrace() {
+ // We append with "" to guard against null pointer exceptions
+ return "" + stackTrace;
+ }
+
+ @Override
+ public GeneratedMessage buildProtoObject() {
+ Exception.Builder exceptionBuilder = Exception.newBuilder();
+ exceptionBuilder.setMessage(getErrorMessage());
+ exceptionBuilder.setStackTrace(getStackTrace());
+ return exceptionBuilder.build();
+ }
+
+ @Override
+ public GeneratedMessage parseProtoFromInputStream(InputStream inputStream)
+ throws IOException {
+ return Exception.parseFrom(inputStream);
+ }
+
+ @Override
+ public void loadFromProto(GeneratedMessage generatedMessage)
+ throws ClassNotFoundException, IOException, InstantiationException,
+ IllegalAccessException {
+ Exception exceptionProto = (Exception) generatedMessage;
+ this.errorMessage = exceptionProto.getMessage();
+ this.stackTrace = exceptionProto.getStackTrace();
+ }
+
+ public void setErrorMessage(String errorMessage) {
+ // We append "" to guard against null pointer exceptions
+ this.errorMessage = "" + errorMessage;
+ }
+
+ public void setStackTrace(String stackTrace) {
+ // We append "" to guard against null pointer exceptions
+ this.stackTrace = "" + stackTrace;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/GiraphMasterScenarioWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/GiraphMasterScenarioWrapper.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/GiraphMasterScenarioWrapper.java
new file mode 100644
index 0000000..0831adc
--- /dev/null
+++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/GiraphMasterScenarioWrapper.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.debugger.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.giraph.debugger.Scenario.CommonVertexMasterContext;
+import org.apache.giraph.debugger.Scenario.Exception;
+import org.apache.giraph.debugger.Scenario.GiraphMasterScenario;
+
+import com.google.protobuf.GeneratedMessage;
+
+/**
+ * Wrapper class around
+ * {@link org.apache.giraph.debugger.Scenario.GiraphMasterScenario} protocol
+ * buffer.
+ *
+ * author semihsalihoglu
+ */
+public class GiraphMasterScenarioWrapper extends BaseWrapper {
+ /**
+ * The MasterCompute class under debugging.
+ */
+ private String masterClassUnderTest;
+ /**
+ * The common wrapper instance.
+ */
+ private CommonVertexMasterContextWrapper commonVertexMasterContextWrapper =
+ null;
+ /**
+ * The exception wrapper instance.
+ */
+ private ExceptionWrapper exceptionWrapper = null;
+
+ /**
+ * Default constructor.
+ */
+ public GiraphMasterScenarioWrapper() {
+ }
+
+ /**
+ * Constructor with a MasterCompute class name.
+ *
+ * @param masterClassUnderTest The MasterCompute class name.
+ */
+ public GiraphMasterScenarioWrapper(String masterClassUnderTest) {
+ this.masterClassUnderTest = masterClassUnderTest;
+ this.commonVertexMasterContextWrapper = new
+ CommonVertexMasterContextWrapper();
+ this.exceptionWrapper = null;
+ }
+
+ public String getMasterClassUnderTest() {
+ return masterClassUnderTest;
+ }
+
+ public CommonVertexMasterContextWrapper getCommonVertexMasterContextWrapper()
+ {
+ return commonVertexMasterContextWrapper;
+ }
+
+ public void setCommonVertexMasterContextWrapper(
+ CommonVertexMasterContextWrapper commonVertexMasterContextWrapper) {
+ this.commonVertexMasterContextWrapper = commonVertexMasterContextWrapper;
+ }
+
+ public ExceptionWrapper getExceptionWrapper() {
+ return exceptionWrapper;
+ }
+
+ public void setExceptionWrapper(ExceptionWrapper exceptionWrapper) {
+ this.exceptionWrapper = exceptionWrapper;
+ }
+
+ /**
+ * Checks if this has an exception wrapper.
+ * @return True if this has an exception wrapper.
+ */
+ public boolean hasExceptionWrapper() {
+ return exceptionWrapper != null;
+ }
+
+ @Override
+ public GeneratedMessage buildProtoObject() {
+ GiraphMasterScenario.Builder giraphMasterScenarioBuilder =
+ GiraphMasterScenario.newBuilder();
+ giraphMasterScenarioBuilder.setMasterClassUnderTest(masterClassUnderTest);
+ giraphMasterScenarioBuilder
+ .setCommonContext((CommonVertexMasterContext)
+ commonVertexMasterContextWrapper.buildProtoObject());
+ if (hasExceptionWrapper()) {
+ giraphMasterScenarioBuilder.setException((Exception) exceptionWrapper
+ .buildProtoObject());
+ }
+ return giraphMasterScenarioBuilder.build();
+ }
+
+ @Override
+ public GeneratedMessage parseProtoFromInputStream(InputStream inputStream)
+ throws IOException {
+ return GiraphMasterScenario.parseFrom(inputStream);
+ }
+
+ @Override
+ public void loadFromProto(GeneratedMessage protoObject)
+ throws ClassNotFoundException, IOException, InstantiationException,
+ IllegalAccessException {
+ GiraphMasterScenario giraphMasterScenario = (GiraphMasterScenario)
+ protoObject;
+ this.masterClassUnderTest = giraphMasterScenario.getMasterClassUnderTest();
+ this.commonVertexMasterContextWrapper = new
+ CommonVertexMasterContextWrapper();
+ this.commonVertexMasterContextWrapper.loadFromProto(giraphMasterScenario
+ .getCommonContext());
+ if (giraphMasterScenario.hasException()) {
+ this.exceptionWrapper = new ExceptionWrapper();
+ this.exceptionWrapper.loadFromProto(giraphMasterScenario.getException());
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append("masterClassUnderTest: " + masterClassUnderTest);
+ stringBuilder.append("\n" + commonVertexMasterContextWrapper.toString());
+ stringBuilder.append("\nhasExceptionWrapper: " + hasExceptionWrapper());
+ if (hasExceptionWrapper()) {
+ stringBuilder.append("\n" + exceptionWrapper.toString());
+ }
+ return stringBuilder.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/GiraphVertexScenarioWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/GiraphVertexScenarioWrapper.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/GiraphVertexScenarioWrapper.java
new file mode 100644
index 0000000..0f36605
--- /dev/null
+++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/GiraphVertexScenarioWrapper.java
@@ -0,0 +1,819 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.debugger.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.giraph.debugger.Scenario.CommonVertexMasterContext;
+import org.apache.giraph.debugger.Scenario.Exception;
+import org.apache.giraph.debugger.Scenario.GiraphVertexScenario;
+import org.apache.giraph.debugger.Scenario.GiraphVertexScenario.VertexContext;
+import org.apache.giraph.debugger.Scenario.GiraphVertexScenario.VertexContext.Neighbor;
+import org.apache.giraph.debugger.Scenario.GiraphVertexScenario.VertexContext.OutgoingMessage;
+import org.apache.giraph.debugger.Scenario.GiraphVertexScenario.VertexScenarioClasses;
+import org.apache.giraph.graph.Computation;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.protobuf.GeneratedMessage;
+
+/**
+ * Wrapper class around
+ * {@link org.apache.giraph.debugger.Scenario.GiraphVertexScenario} protocol
+ * buffer. In {@link org.apache.giraph.debugger.Scenario.GiraphVertexScenario}
+ * most fields are stored as serialized byte arrays and this class gives them
+ * access through the java classes that those byte arrays serialize.
+ *
+ * @param <I>
+ * vertex ID class.
+ * @param <V>
+ * vertex value class.
+ * @param <E>
+ * edge value class.
+ * @param <M1>
+ * incoming message class.
+ * @param <M2>
+ * outgoing message class.
+ *
+ * author Brian Truong
+ */
+@SuppressWarnings("rawtypes")
+public class GiraphVertexScenarioWrapper<I extends WritableComparable, V extends
+ Writable, E extends Writable, M1 extends Writable, M2 extends Writable>
+ extends BaseWrapper {
+
+ /**
+ * Vertex scenario classes wrapper instance.
+ */
+ private VertexScenarioClassesWrapper vertexScenarioClassesWrapper = null;
+ /**
+ * Vertex context wrapper instance.
+ */
+ private VertexContextWrapper contextWrapper = null;
+ /**
+ * Exception wrapper instance.
+ */
+ private ExceptionWrapper exceptionWrapper = null;
+
+ /**
+ * Empty constructor to be used for loading from HDFS.
+ */
+ public GiraphVertexScenarioWrapper() {
+ }
+
+ /**
+ * Constructor with classes.
+ *
+ * @param classUnderTest The Computation class under test.
+ * @param vertexIdClass The vertex id class.
+ * @param vertexValueClass The vertex value class.
+ * @param edgeValueClass The edge value class.
+ * @param incomingMessageClass The incoming message class.
+ * @param outgoingMessageClass The outgoing message class.
+ */
+ public GiraphVertexScenarioWrapper(
+ Class<? extends Computation<I, V, E, M1, M2>> classUnderTest,
+ Class<I> vertexIdClass, Class<V> vertexValueClass, Class<E> edgeValueClass,
+ Class<M1> incomingMessageClass, Class<M2> outgoingMessageClass) {
+ this.vertexScenarioClassesWrapper = new VertexScenarioClassesWrapper(
+ classUnderTest, vertexIdClass, vertexValueClass, edgeValueClass,
+ incomingMessageClass, outgoingMessageClass);
+ this.contextWrapper = new VertexContextWrapper();
+ }
+
+ public VertexContextWrapper getContextWrapper() {
+ return contextWrapper;
+ }
+
+ public void setContextWrapper(VertexContextWrapper contextWrapper) {
+ this.contextWrapper = contextWrapper;
+ }
+
+ /**
+ * Checks if this has an exception wrapper.
+ * @return True if this has an exception wrapper.
+ */
+ public boolean hasExceptionWrapper() {
+ return exceptionWrapper != null;
+ }
+
+ public ExceptionWrapper getExceptionWrapper() {
+ return exceptionWrapper;
+ }
+
+ public void setExceptionWrapper(ExceptionWrapper exceptionWrapper) {
+ this.exceptionWrapper = exceptionWrapper;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append(super.toString());
+ stringBuilder.append("\n" + vertexScenarioClassesWrapper.toString());
+ stringBuilder.append("\n" + contextWrapper.toString());
+ stringBuilder.append("\nhasExceptionWrapper: " + hasExceptionWrapper());
+ if (hasExceptionWrapper()) {
+ stringBuilder.append("\n" + exceptionWrapper.toString());
+ }
+ return stringBuilder.toString();
+ }
+
+ /**
+ * Wrapper class around
+ * {@link
+ * org.apache.giraph.debugger.Scenario.GiraphVertexScenario.VertexContext}
+ * protocol buffer.
+ *
+ * author semihsalihoglu
+ */
+ public class VertexContextWrapper extends BaseWrapper {
+ /**
+ * Vertex master context wrapper instance.
+ */
+ private CommonVertexMasterContextWrapper commonVertexMasterContextWrapper;
+ /**
+ * Reference to the vertex id.
+ */
+ private I vertexIdWrapper;
+ /**
+ * Reference to the vertex value before the computation.
+ */
+ private V vertexValueBeforeWrapper;
+ /**
+ * Reference to the vertex value after the computation.
+ */
+ private V vertexValueAfterWrapper;
+ /**
+ * List of incoming messages.
+ */
+ private ArrayList<M1> inMsgsWrapper;
+ /**
+ * List of neighbor vertices.
+ */
+ private ArrayList<NeighborWrapper> neighborsWrapper;
+ /**
+ * List of outgoing messages.
+ */
+ private ArrayList<OutgoingMessageWrapper> outMsgsWrapper;
+
+ /**
+ * Default constructor.
+ */
+ public VertexContextWrapper() {
+ reset();
+ }
+
+ /**
+ * Initializes/resets this instances.
+ */
+ public void reset() {
+ this.commonVertexMasterContextWrapper = new
+ CommonVertexMasterContextWrapper();
+ this.vertexIdWrapper = null;
+ this.vertexValueBeforeWrapper = null;
+ this.vertexValueAfterWrapper = null;
+ this.inMsgsWrapper = new ArrayList<M1>();
+ this.neighborsWrapper = new ArrayList<NeighborWrapper>();
+ this.outMsgsWrapper = new ArrayList<OutgoingMessageWrapper>();
+ }
+
+ public CommonVertexMasterContextWrapper
+ getCommonVertexMasterContextWrapper() {
+ return commonVertexMasterContextWrapper;
+ }
+
+ public void setCommonVertexMasterContextWrapper(
+ CommonVertexMasterContextWrapper commonVertexMasterContextWrapper) {
+ this.commonVertexMasterContextWrapper = commonVertexMasterContextWrapper;
+ }
+
+ public I getVertexIdWrapper() {
+ return vertexIdWrapper;
+ }
+
+ public void setVertexIdWrapper(I vertexId) {
+ this.vertexIdWrapper = vertexId;
+ }
+
+ public V getVertexValueBeforeWrapper() {
+ return vertexValueBeforeWrapper;
+ }
+
+ public V getVertexValueAfterWrapper() {
+ return vertexValueAfterWrapper;
+ }
+
+ public void setVertexValueBeforeWrapper(V vertexValueBefore) {
+ // Because Giraph does not create new objects for writables, we need
+ // to make a clone them to get a copy of the objects. Otherwise, if
+ // we call setVertexValueBeforeWrapper and then setVertexValueAfterWrapper
+ // both of our copies end up pointing to the same object (in this case to
+ // the value passed to setVertexValueAfterWrapper, because it was called
+ // later).
+ this.vertexValueBeforeWrapper = DebuggerUtils.makeCloneOf(
+ vertexValueBefore, getVertexScenarioClassesWrapper().vertexValueClass);
+ }
+
+ public void setVertexValueAfterWrapper(V vertexValueAfter) {
+ // See the explanation for making a clone inside
+ // setVertexValueBeforeWrapper
+ this.vertexValueAfterWrapper = DebuggerUtils.makeCloneOf(
+ vertexValueAfter, getVertexScenarioClassesWrapper().vertexValueClass);
+ }
+
+ /**
+ * Captures an incoming message by keeping a clone.
+ *
+ * @param message The message to capture.
+ */
+ public void addIncomingMessageWrapper(M1 message) {
+ // See the explanation for making a clone inside
+ // setVertexValueBeforeWrapper
+ inMsgsWrapper.add(DebuggerUtils.makeCloneOf(message,
+ getVertexScenarioClassesWrapper().incomingMessageClass));
+ }
+
+ public Collection<M1> getIncomingMessageWrappers() {
+ return inMsgsWrapper;
+ }
+
+ /**
+ * Captures an outgoing message by keeping a clone.
+ *
+ * @param receiverId The vertex id that receives the message.
+ * @param message The message being sent to be captured.
+ */
+ public void addOutgoingMessageWrapper(I receiverId, M2 message) {
+ // See the explanation for making a clone inside
+ // setVertexValueBeforeWrapper
+ outMsgsWrapper.add(new OutgoingMessageWrapper(DebuggerUtils.makeCloneOf(
+ receiverId, getVertexScenarioClassesWrapper().vertexIdClass),
+ DebuggerUtils.makeCloneOf(message,
+ getVertexScenarioClassesWrapper().outgoingMessageClass)));
+ }
+
+ public Collection<OutgoingMessageWrapper> getOutgoingMessageWrappers() {
+ return outMsgsWrapper;
+ }
+
+ /**
+ * Adds a neighbor vertex.
+ *
+ * @param neighborId The neighbor vertex id.
+ * @param edgeValue The value of the edge that connects to the neighbor.
+ */
+ public void addNeighborWrapper(I neighborId, E edgeValue) {
+ // See the explanation for making a clone inside
+ // setVertexValueBeforeWrapper
+ neighborsWrapper.add(new NeighborWrapper(DebuggerUtils.makeCloneOf(
+ neighborId, getVertexScenarioClassesWrapper().vertexIdClass),
+ DebuggerUtils.makeCloneOf(edgeValue,
+ getVertexScenarioClassesWrapper().edgeValueClass)));
+ }
+
+ public Collection<NeighborWrapper> getNeighborWrappers() {
+ return neighborsWrapper;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append(commonVertexMasterContextWrapper.toString());
+ stringBuilder.append("\nvertexId: " + getVertexIdWrapper());
+ stringBuilder.append("\nvertexValueBefore: " +
+ getVertexValueBeforeWrapper());
+ stringBuilder.append("\nvertexValueAfter: " +
+ getVertexValueAfterWrapper());
+ stringBuilder.append("\nnumNeighbors: " + getNeighborWrappers().size());
+
+ for (NeighborWrapper neighborWrapper : getNeighborWrappers()) {
+ stringBuilder.append("\n" + neighborWrapper.toString());
+ }
+
+ for (M1 incomingMesage : getIncomingMessageWrappers()) {
+ stringBuilder.append("\nincoming message: " + incomingMesage);
+ }
+
+ stringBuilder.append("\nnumOutgoingMessages: " +
+ getOutgoingMessageWrappers().size());
+ for (OutgoingMessageWrapper outgoingMessageWrapper :
+ getOutgoingMessageWrappers()) {
+ stringBuilder.append("\n" + outgoingMessageWrapper);
+ }
+ return stringBuilder.toString();
+ }
+
+ /**
+ * Wrapper around scenario.giraphscenerio.neighbor (in scenario.proto).
+ *
+ * author Brian Truong
+ */
+ public class NeighborWrapper extends BaseWrapper {
+
+ /**
+ * Neighbor vertex id.
+ */
+ private I nbrId;
+ /**
+ * Value of the edge that points to the neighbor.
+ */
+ private E edgeValue;
+
+ /**
+ * Constructor with the fields.
+ *
+ * @param nbrId Neighbor vertex id.
+ * @param edgeValue Value of the edge that points to the neighbor.
+ */
+ public NeighborWrapper(I nbrId, E edgeValue) {
+ this.nbrId = nbrId;
+ this.edgeValue = edgeValue;
+ }
+
+ /**
+ * Default constructor.
+ */
+ public NeighborWrapper() {
+ }
+
+ public I getNbrId() {
+ return nbrId;
+ }
+
+ public E getEdgeValue() {
+ return edgeValue;
+ }
+
+ @Override
+ public String toString() {
+ return "neighbor: nbrId: " + nbrId + " edgeValue: " + edgeValue;
+ }
+
+ @Override
+ public GeneratedMessage buildProtoObject() {
+ Neighbor.Builder neighborBuilder = Neighbor.newBuilder();
+ neighborBuilder.setNeighborId(toByteString(nbrId));
+ if (edgeValue != null) {
+ neighborBuilder.setEdgeValue(toByteString(edgeValue));
+ } else {
+ neighborBuilder.clearEdgeValue();
+ }
+ return neighborBuilder.build();
+ }
+
+ @Override
+ public GeneratedMessage parseProtoFromInputStream(InputStream inputStream)
+ throws IOException {
+ return Neighbor.parseFrom(inputStream);
+ }
+
+ @Override
+ public void loadFromProto(GeneratedMessage protoObject)
+ throws ClassNotFoundException, IOException, InstantiationException,
+ IllegalAccessException {
+ Neighbor neighbor = (Neighbor) protoObject;
+ this.nbrId = DebuggerUtils
+ .newInstance(vertexScenarioClassesWrapper.vertexIdClass);
+ fromByteString(neighbor.getNeighborId(), this.nbrId);
+
+ if (neighbor.hasEdgeValue()) {
+ this.edgeValue = DebuggerUtils
+ .newInstance(vertexScenarioClassesWrapper.edgeValueClass);
+ fromByteString(neighbor.getEdgeValue(), this.edgeValue);
+ } else {
+ this.edgeValue = null;
+ }
+ }
+ }
+
+ /**
+ * Class for capturing outgoing message.
+ */
+ public class OutgoingMessageWrapper extends BaseWrapper {
+ /**
+ * Destination vertex id.
+ */
+ private I destinationId;
+ /**
+ * Outgoing message.
+ */
+ private M2 message;
+
+ /**
+ * Constructor with the field values.
+ *
+ * @param destinationId Destination vertex id.
+ * @param message Outgoing message.
+ */
+ public OutgoingMessageWrapper(I destinationId, M2 message) {
+ this.setDestinationId(destinationId);
+ this.setMessage(message);
+ }
+
+ /**
+ * Default constructor.
+ */
+ public OutgoingMessageWrapper() {
+ }
+
+ public I getDestinationId() {
+ return destinationId;
+ }
+
+ public M2 getMessage() {
+ return message;
+ }
+
+ @Override
+ public String toString() {
+ return "outgoingMessage: destinationId: " + getDestinationId() +
+ " message: " + getMessage();
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result +
+ (getDestinationId() == null ? 0 : getDestinationId().hashCode());
+ result = prime * result + (getMessage() == null ? 0 :
+ getMessage().hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ @SuppressWarnings("unchecked")
+ OutgoingMessageWrapper other = (OutgoingMessageWrapper) obj;
+ if (getDestinationId() == null) {
+ if (other.getDestinationId() != null) {
+ return false;
+ }
+ } else if (!getDestinationId().equals(other.getDestinationId())) {
+ return false;
+ }
+ if (getMessage() == null) {
+ if (other.getMessage() != null) {
+ return false;
+ }
+ } else if (!getMessage().equals(other.getMessage())) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public GeneratedMessage buildProtoObject() {
+ OutgoingMessage.Builder outgoingMessageBuilder = OutgoingMessage
+ .newBuilder();
+ outgoingMessageBuilder.setMsgData(toByteString(this.getMessage()));
+ outgoingMessageBuilder
+ .setDestinationId(toByteString(this.getDestinationId()));
+ return outgoingMessageBuilder.build();
+ }
+
+ @Override
+ public GeneratedMessage parseProtoFromInputStream(InputStream inputStream)
+ throws IOException {
+ return OutgoingMessage.parseFrom(inputStream);
+ }
+
+ @Override
+ public void loadFromProto(GeneratedMessage generatedMessage)
+ throws ClassNotFoundException, IOException, InstantiationException,
+ IllegalAccessException {
+ OutgoingMessage outgoingMessageProto = (OutgoingMessage)
+ generatedMessage;
+ this.setDestinationId(DebuggerUtils
+ .newInstance(getVertexScenarioClassesWrapper().vertexIdClass));
+ fromByteString(outgoingMessageProto.getDestinationId(),
+ getDestinationId());
+ this.setMessage(DebuggerUtils
+ .newInstance(getVertexScenarioClassesWrapper().outgoingMessageClass));
+ fromByteString(outgoingMessageProto.getMsgData(), this.getMessage());
+ }
+
+ /**
+ * @param destinationId the destinationId to set
+ */
+ public void setDestinationId(I destinationId) {
+ this.destinationId = destinationId;
+ }
+
+ /**
+ * @param message the message to set
+ */
+ public void setMessage(M2 message) {
+ this.message = message;
+ }
+ }
+
+ @Override
+ public GeneratedMessage buildProtoObject() {
+ VertexContext.Builder contextBuilder = VertexContext.newBuilder();
+ contextBuilder
+ .setCommonContext((CommonVertexMasterContext)
+ commonVertexMasterContextWrapper.buildProtoObject());
+ contextBuilder.setVertexId(toByteString(vertexIdWrapper));
+ if (vertexValueBeforeWrapper != null) {
+ contextBuilder
+ .setVertexValueBefore(toByteString(vertexValueBeforeWrapper));
+ }
+ if (vertexValueAfterWrapper != null) {
+ contextBuilder
+ .setVertexValueAfter(toByteString(vertexValueAfterWrapper));
+ }
+
+ for (GiraphVertexScenarioWrapper<I, V, E, M1, M2>.VertexContextWrapper.
+ NeighborWrapper neighborWrapper : neighborsWrapper) {
+ contextBuilder.addNeighbor((Neighbor) neighborWrapper
+ .buildProtoObject());
+ }
+
+ for (M1 msg : inMsgsWrapper) {
+ contextBuilder.addInMessage(toByteString(msg));
+ }
+
+ for (OutgoingMessageWrapper outgoingMessageWrapper : outMsgsWrapper) {
+ contextBuilder.addOutMessage((OutgoingMessage) outgoingMessageWrapper
+ .buildProtoObject());
+ }
+
+ return contextBuilder.build();
+ }
+
+ @Override
+ public GeneratedMessage parseProtoFromInputStream(InputStream inputStream)
+ throws IOException {
+ return VertexContext.parseFrom(inputStream);
+ }
+
+ @Override
+ public void loadFromProto(GeneratedMessage generatedMessage)
+ throws ClassNotFoundException, IOException, InstantiationException,
+ IllegalAccessException {
+ VertexContext context = (VertexContext) generatedMessage;
+
+ CommonVertexMasterContextWrapper vertexMasterContextWrapper = new
+ CommonVertexMasterContextWrapper();
+ vertexMasterContextWrapper
+ .loadFromProto(context.getCommonContext());
+ this.commonVertexMasterContextWrapper = vertexMasterContextWrapper;
+
+ I vertexId = DebuggerUtils
+ .newInstance(getVertexScenarioClassesWrapper().vertexIdClass);
+ fromByteString(context.getVertexId(), vertexId);
+ this.vertexIdWrapper = vertexId;
+
+ V vertexValueBefore = DebuggerUtils
+ .newInstance(getVertexScenarioClassesWrapper().vertexValueClass);
+ fromByteString(context.getVertexValueBefore(), vertexValueBefore);
+ this.vertexValueBeforeWrapper = vertexValueBefore;
+ if (context.hasVertexValueAfter()) {
+ V vertexValueAfter = DebuggerUtils
+ .newInstance(getVertexScenarioClassesWrapper().vertexValueClass);
+ fromByteString(context.getVertexValueAfter(), vertexValueAfter);
+ this.vertexValueAfterWrapper = vertexValueAfter;
+ }
+
+ for (Neighbor neighbor : context.getNeighborList()) {
+ NeighborWrapper neighborWrapper = new NeighborWrapper();
+ neighborWrapper.loadFromProto(neighbor);
+ this.neighborsWrapper.add(neighborWrapper);
+ }
+ for (int i = 0; i < context.getInMessageCount(); i++) {
+ M1 msg = DebuggerUtils
+ .newInstance(getVertexScenarioClassesWrapper().incomingMessageClass);
+ fromByteString(context.getInMessage(i), msg);
+ this.addIncomingMessageWrapper(msg);
+ }
+
+ for (OutgoingMessage outgoingMessageProto : context.getOutMessageList()) {
+ OutgoingMessageWrapper outgoingMessageWrapper = new
+ OutgoingMessageWrapper();
+ outgoingMessageWrapper.loadFromProto(outgoingMessageProto);
+ this.outMsgsWrapper.add(outgoingMessageWrapper);
+ }
+ }
+ }
+
+ /**
+ * Class for capturing the parameter classes used for Giraph Computation.
+ */
+ public class VertexScenarioClassesWrapper extends
+ BaseScenarioAndIntegrityWrapper<I> {
+ /**
+ * The Computation class.
+ */
+ private Class<?> classUnderTest;
+ /**
+ * The vertex value class.
+ */
+ private Class<V> vertexValueClass;
+ /**
+ * The edge value class.
+ */
+ private Class<E> edgeValueClass;
+ /**
+ * The incoming message class.
+ */
+ private Class<M1> incomingMessageClass;
+ /**
+ * The outgoing message class.
+ */
+ private Class<M2> outgoingMessageClass;
+
+ /**
+ * Default constructor.
+ */
+ public VertexScenarioClassesWrapper() {
+ }
+
+ /**
+ * Constructor with field values.
+ *
+ * @param classUnderTest Computation class.
+ * @param vertexIdClass Vertex id class.
+ * @param vertexValueClass Vertex value class.
+ * @param edgeValueClass Edge value class.
+ * @param incomingMessageClass Incoming message class.
+ * @param outgoingMessageClass Outgoing message class.
+ */
+ public VertexScenarioClassesWrapper(
+ Class<? extends Computation<I, V, E, M1, M2>> classUnderTest,
+ Class<I> vertexIdClass, Class<V> vertexValueClass,
+ Class<E> edgeValueClass, Class<M1> incomingMessageClass,
+ Class<M2> outgoingMessageClass) {
+ super(vertexIdClass);
+ this.classUnderTest = classUnderTest;
+ this.vertexValueClass = vertexValueClass;
+ this.edgeValueClass = edgeValueClass;
+ this.incomingMessageClass = incomingMessageClass;
+ this.outgoingMessageClass = outgoingMessageClass;
+ }
+
+ public Class<?> getClassUnderTest() {
+ return classUnderTest;
+ }
+
+ public Class<V> getVertexValueClass() {
+ return vertexValueClass;
+ }
+
+ public Class<E> getEdgeValueClass() {
+ return edgeValueClass;
+ }
+
+ public Class<M1> getIncomingMessageClass() {
+ return incomingMessageClass;
+ }
+
+ public Class<M2> getOutgoingMessageClass() {
+ return outgoingMessageClass;
+ }
+
+ @Override
+ public GeneratedMessage buildProtoObject() {
+ VertexScenarioClasses.Builder vertexScenarioClassesBuilder =
+ VertexScenarioClasses.newBuilder();
+ vertexScenarioClassesBuilder.setClassUnderTest(getClassUnderTest()
+ .getName());
+ vertexScenarioClassesBuilder.setVertexIdClass(getVertexIdClass()
+ .getName());
+ vertexScenarioClassesBuilder.setVertexValueClass(getVertexValueClass()
+ .getName());
+ vertexScenarioClassesBuilder.setEdgeValueClass(getEdgeValueClass()
+ .getName());
+ vertexScenarioClassesBuilder
+ .setIncomingMessageClass(getIncomingMessageClass().getName());
+ vertexScenarioClassesBuilder
+ .setOutgoingMessageClass(getOutgoingMessageClass().getName());
+ return vertexScenarioClassesBuilder.build();
+ }
+
+ @Override
+ public GeneratedMessage parseProtoFromInputStream(InputStream inputStream)
+ throws IOException {
+ return VertexScenarioClasses.parseFrom(inputStream);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void loadFromProto(GeneratedMessage generatedMessage)
+ throws ClassNotFoundException, IOException, InstantiationException,
+ IllegalAccessException {
+ VertexScenarioClasses vertexScenarioClass = (VertexScenarioClasses)
+ generatedMessage;
+ Class<?> clazz = Class.forName(vertexScenarioClass.getClassUnderTest());
+ this.classUnderTest = castClassToUpperBound(clazz, Computation.class);
+ this.vertexIdClass = (Class<I>) castClassToUpperBound(
+ Class.forName(vertexScenarioClass.getVertexIdClass()),
+ WritableComparable.class);
+ this.vertexValueClass = (Class<V>) castClassToUpperBound(
+ Class.forName(vertexScenarioClass.getVertexValueClass()),
+ Writable.class);
+ this.edgeValueClass = (Class<E>) castClassToUpperBound(
+ Class.forName(vertexScenarioClass.getEdgeValueClass()), Writable.class);
+ this.incomingMessageClass = (Class<M1>) castClassToUpperBound(
+ Class.forName(vertexScenarioClass.getIncomingMessageClass()),
+ Writable.class);
+ this.outgoingMessageClass = (Class<M2>) castClassToUpperBound(
+ Class.forName(vertexScenarioClass.getOutgoingMessageClass()),
+ Writable.class);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append(super.toString());
+ stringBuilder.append("\nclassUnderTest: " +
+ getClassUnderTest().getCanonicalName());
+ stringBuilder.append("\nvertexValueClass: " +
+ getVertexValueClass().getCanonicalName());
+ stringBuilder.append("\nincomingMessageClass: " +
+ getIncomingMessageClass().getCanonicalName());
+ stringBuilder.append("\noutgoingMessageClass: " +
+ getOutgoingMessageClass().getCanonicalName());
+ return stringBuilder.toString();
+ }
+
+ }
+
+ @Override
+ public void loadFromProto(GeneratedMessage generatedMessage)
+ throws ClassNotFoundException, IOException, InstantiationException,
+ IllegalAccessException {
+ GiraphVertexScenario giraphScenario = (GiraphVertexScenario)
+ generatedMessage;
+ this.vertexScenarioClassesWrapper = new VertexScenarioClassesWrapper();
+ this.vertexScenarioClassesWrapper.loadFromProto(giraphScenario
+ .getVertexScenarioClasses());
+
+ this.contextWrapper = new VertexContextWrapper();
+ this.contextWrapper.loadFromProto(giraphScenario.getContext());
+
+ if (giraphScenario.hasException()) {
+ this.exceptionWrapper = new ExceptionWrapper();
+ this.exceptionWrapper.loadFromProto(giraphScenario.getException());
+ }
+ }
+
+ @Override
+ public GeneratedMessage buildProtoObject() {
+ GiraphVertexScenario.Builder giraphScenarioBuilder = GiraphVertexScenario
+ .newBuilder();
+ giraphScenarioBuilder
+ .setVertexScenarioClasses((VertexScenarioClasses)
+ vertexScenarioClassesWrapper.buildProtoObject());
+ giraphScenarioBuilder.setContext((VertexContext) contextWrapper
+ .buildProtoObject());
+ if (hasExceptionWrapper()) {
+ giraphScenarioBuilder.setException((Exception) exceptionWrapper
+ .buildProtoObject());
+ }
+ GiraphVertexScenario giraphScenario = giraphScenarioBuilder.build();
+ return giraphScenario;
+ }
+
+ @Override
+ public GeneratedMessage parseProtoFromInputStream(InputStream inputStream)
+ throws IOException {
+ return GiraphVertexScenario.parseFrom(inputStream);
+ }
+
+ public VertexScenarioClassesWrapper getVertexScenarioClassesWrapper() {
+ return vertexScenarioClassesWrapper;
+ }
+
+ public void setVertexScenarioClassesWrapper(
+ VertexScenarioClassesWrapper vertexScenarioClassesWrapper) {
+ this.vertexScenarioClassesWrapper = vertexScenarioClassesWrapper;
+ }
+}