You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/03/24 21:45:51 UTC
[1/5] git commit: GIRAPH-566: Make option for aggregators to be
configurable (majakabiljo)
Updated Branches:
refs/heads/perf 882545d1c -> ed8b54eee
GIRAPH-566: Make option for aggregators to be configurable (majakabiljo)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/f4bd1996
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/f4bd1996
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/f4bd1996
Branch: refs/heads/perf
Commit: f4bd1996e66fbe3f4daf8aca1e7be1e9cead5dee
Parents: a802fef
Author: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Authored: Thu Mar 21 15:49:12 2013 -0700
Committer: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Committed: Thu Mar 21 15:49:12 2013 -0700
----------------------------------------------------------------------
CHANGELOG | 2 ++
.../java/org/apache/giraph/comm/ServerData.java | 4 ++--
.../giraph/comm/aggregators/AggregatorUtils.java | 15 +++++----------
.../comm/aggregators/AllAggregatorServerData.java | 12 +++++++++---
.../aggregators/OwnerAggregatorServerData.java | 10 ++++++++--
.../giraph/worker/WorkerAggregatorHandler.java | 3 ++-
6 files changed, 28 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/f4bd1996/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 3032873..a1446a7 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-566: Make option for aggregators to be configurable (majakabiljo)
+
GIRAPH-575: update hive-io (nitay)
GIRAPH-576: BspServiceMaster.failureCleanup() shouldn't pass null in
http://git-wip-us.apache.org/repos/asf/giraph/blob/f4bd1996/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index e6dff8c..70dc156 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -107,8 +107,8 @@ public class ServerData<I extends WritableComparable,
new SimplePartitionStore<I, V, E, M>(configuration, context);
}
edgeStore = new EdgeStore<I, V, E, M>(service, configuration, context);
- ownerAggregatorData = new OwnerAggregatorServerData(context);
- allAggregatorData = new AllAggregatorServerData(context);
+ ownerAggregatorData = new OwnerAggregatorServerData(context, configuration);
+ allAggregatorData = new AllAggregatorServerData(context, configuration);
}
public EdgeStore<I, V, E, M> getEdgeStore() {
http://git-wip-us.apache.org/repos/asf/giraph/blob/f4bd1996/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
index 0abd7e1..ceb30a8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
@@ -20,6 +20,7 @@ package org.apache.giraph.comm.aggregators;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.utils.ReflectionUtils;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.Writable;
@@ -78,19 +79,13 @@ public class AggregatorUtils {
* catch all exceptions.
*
* @param aggregatorClass Class of aggregator
+ * @param conf Configuration
* @return New aggregator
*/
public static Aggregator<Writable> newAggregatorInstance(
- Class<Aggregator<Writable>> aggregatorClass) {
- try {
- return aggregatorClass.newInstance();
- } catch (InstantiationException e) {
- throw new IllegalStateException("createAggregator: " +
- "InstantiationException for aggregator class " + aggregatorClass, e);
- } catch (IllegalAccessException e) {
- throw new IllegalStateException("createAggregator: " +
- "IllegalAccessException for aggregator class " + aggregatorClass, e);
- }
+ Class<Aggregator<Writable>> aggregatorClass,
+ ImmutableClassesGiraphConfiguration conf) {
+ return ReflectionUtils.newInstance(aggregatorClass, conf);
}
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/f4bd1996/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
index f38c6cd..177e738 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
@@ -19,6 +19,7 @@
package org.apache.giraph.comm.aggregators;
import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.master.MasterInfo;
import org.apache.giraph.utils.TaskIdsPermitsBarrier;
import org.apache.hadoop.io.Writable;
@@ -83,14 +84,19 @@ public class AllAggregatorServerData {
private final TaskIdsPermitsBarrier workersBarrier;
/** Progressable used to report progress */
private final Progressable progressable;
+ /** Configuration */
+ private final ImmutableClassesGiraphConfiguration conf;
/**
* Constructor
*
* @param progressable Progressable used to report progress
+ * @param conf Configuration
*/
- public AllAggregatorServerData(Progressable progressable) {
+ public AllAggregatorServerData(Progressable progressable,
+ ImmutableClassesGiraphConfiguration conf) {
this.progressable = progressable;
+ this.conf = conf;
workersBarrier = new TaskIdsPermitsBarrier(progressable);
masterBarrier = new TaskIdsPermitsBarrier(progressable);
}
@@ -106,7 +112,7 @@ public class AllAggregatorServerData {
aggregatorClassMap.put(name, aggregatorClass);
if (!aggregatorTypesMap.containsKey(aggregatorClass)) {
aggregatorTypesMap.putIfAbsent(aggregatorClass,
- AggregatorUtils.newAggregatorInstance(aggregatorClass));
+ AggregatorUtils.newAggregatorInstance(aggregatorClass, conf));
}
progressable.progress();
}
@@ -225,7 +231,7 @@ public class AllAggregatorServerData {
currentAggregatorMap.get(entry.getKey());
if (aggregator == null) {
currentAggregatorMap.put(entry.getKey(),
- AggregatorUtils.newAggregatorInstance(entry.getValue()));
+ AggregatorUtils.newAggregatorInstance(entry.getValue(), conf));
} else {
aggregator.reset();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f4bd1996/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
index bd6068a..eb25a2e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
@@ -19,6 +19,7 @@
package org.apache.giraph.comm.aggregators;
import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.utils.TaskIdsPermitsBarrier;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.Progressable;
@@ -72,14 +73,19 @@ public class OwnerAggregatorServerData {
private final TaskIdsPermitsBarrier workersBarrier;
/** Progressable used to report progress */
private final Progressable progressable;
+ /** Configuration */
+ private final ImmutableClassesGiraphConfiguration conf;
/**
* Constructor
*
* @param progressable Progressable used to report progress
+ * @param conf Configuration
*/
- public OwnerAggregatorServerData(Progressable progressable) {
+ public OwnerAggregatorServerData(Progressable progressable,
+ ImmutableClassesGiraphConfiguration conf) {
this.progressable = progressable;
+ this.conf = conf;
workersBarrier = new TaskIdsPermitsBarrier(progressable);
}
@@ -95,7 +101,7 @@ public class OwnerAggregatorServerData {
LOG.debug("registerAggregator: The first registration after a reset()");
}
myAggregatorMap.putIfAbsent(name,
- AggregatorUtils.newAggregatorInstance(aggregatorClass));
+ AggregatorUtils.newAggregatorInstance(aggregatorClass, conf));
progressable.progress();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f4bd1996/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
index 3c18449..9a8a8b8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
@@ -290,7 +290,8 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
WorkerAggregatorHandler.this.currentAggregatorMap.entrySet()) {
threadAggregatorMap.put(entry.getKey(),
AggregatorUtils.newAggregatorInstance(
- (Class<Aggregator<Writable>>) entry.getValue().getClass()));
+ (Class<Aggregator<Writable>>) entry.getValue().getClass(),
+ conf));
}
}
[2/5] git commit: GIRAPH-510: Remove HBase Cruft
Posted by ni...@apache.org.
GIRAPH-510: Remove HBase Cruft
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/15668279
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/15668279
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/15668279
Branch: refs/heads/perf
Commit: 156682791504c2e433382551e1be301e1d9146cb
Parents: f4bd199
Author: Nitay Joffe <ni...@fb.com>
Authored: Thu Mar 21 22:03:03 2013 -0400
Committer: Nitay Joffe <ni...@fb.com>
Committed: Thu Mar 21 22:08:45 2013 -0400
----------------------------------------------------------------------
CHANGELOG | 2 +
.../io/hbase/TestHBaseRootMarkerVertextFormat.java | 70 +++++++++++----
2 files changed, 55 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/15668279/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index a1446a7..db8f4ce 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-510: Remove HBase Cruft (kelarini via nitay)
+
GIRAPH-566: Make option for aggregators to be configurable (majakabiljo)
GIRAPH-575: update hive-io (nitay)
http://git-wip-us.apache.org/repos/asf/giraph/blob/15668279/giraph-hbase/src/test/java/org/apache/giraph/io/hbase/TestHBaseRootMarkerVertextFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hbase/src/test/java/org/apache/giraph/io/hbase/TestHBaseRootMarkerVertextFormat.java b/giraph-hbase/src/test/java/org/apache/giraph/io/hbase/TestHBaseRootMarkerVertextFormat.java
index 4d9588a..7415ddc 100644
--- a/giraph-hbase/src/test/java/org/apache/giraph/io/hbase/TestHBaseRootMarkerVertextFormat.java
+++ b/giraph-hbase/src/test/java/org/apache/giraph/io/hbase/TestHBaseRootMarkerVertextFormat.java
@@ -29,8 +29,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.client.Get;
@@ -51,6 +53,7 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
+import java.util.UUID;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -65,7 +68,7 @@ public class TestHBaseRootMarkerVertextFormat extends BspCase {
*
* @param testName name of the test case
*/
- private HBaseTestingUtility testUtil = new HBaseTestingUtility();
+
private final Logger log = Logger.getLogger(TestHBaseRootMarkerVertextFormat.class);
private final String TABLE_NAME = "simple_graph";
@@ -73,12 +76,33 @@ public class TestHBaseRootMarkerVertextFormat extends BspCase {
private final String QUALIFER = "children";
private final String OUTPUT_FIELD = "parent";
+ private HBaseTestingUtility testUtil;
+ private Path hbaseRootdir;
+
+
public TestHBaseRootMarkerVertextFormat() {
super(TestHBaseRootMarkerVertextFormat.class.getName());
+
+ // Let's set up the hbase root directory.
+ Configuration conf = HBaseConfiguration.create();
+ try {
+ FileSystem fs = FileSystem.get(conf);
+ String randomStr = UUID.randomUUID().toString();
+ String tmpdir = System.getProperty("java.io.tmpdir") +
+ randomStr + "/";
+ hbaseRootdir = fs.makeQualified(new Path(tmpdir));
+ conf.set(HConstants.HBASE_DIR, hbaseRootdir.toString());
+ fs.mkdirs(hbaseRootdir);
+ } catch(IOException ioe) {
+ fail("Could not create hbase root directory.");
+ }
+
+ // Start the test utility.
+ testUtil = new HBaseTestingUtility(conf);
}
@Test
- public void testHBaseInputOutput() throws Exception{
+ public void testHBaseInputOutput() throws Exception {
if (System.getProperty("prop.mapred.job.tracker") != null) {
if(log.isInfoEnabled())
@@ -93,7 +117,7 @@ public class TestHBaseRootMarkerVertextFormat extends BspCase {
"Make sure you built the main Giraph artifact?.");
}
- String INPUT_FILE = "graph.csv";
+ String INPUT_FILE = hbaseRootdir.toString() + "/graph.csv";
//First let's load some data using ImportTsv into our mock table.
String[] args = new String[] {
"-Dimporttsv.columns=HBASE_ROW_KEY,cf:"+QUALIFER,
@@ -102,17 +126,23 @@ public class TestHBaseRootMarkerVertextFormat extends BspCase {
INPUT_FILE
};
- MiniZooKeeperCluster zkCluster = testUtil.startMiniZKCluster();
- MiniHBaseCluster cluster = testUtil.startMiniHBaseCluster(2, 2);
-
- GenericOptionsParser opts =
- new GenericOptionsParser(cluster.getConfiguration(), args);
- Configuration conf = opts.getConfiguration();
- args = opts.getRemainingArgs();
+ MiniHBaseCluster cluster = null;
+ MiniZooKeeperCluster zkCluster = null;
+ FileSystem fs = null;
try {
-
- FileSystem fs = FileSystem.get(conf);
+ // using the restart method allows us to avoid having the hbase
+ // root directory overwritten by /home/$username
+ zkCluster = testUtil.startMiniZKCluster();
+ testUtil.restartHBaseCluster(2);
+ cluster = testUtil.getMiniHBaseCluster();
+
+ GenericOptionsParser opts =
+ new GenericOptionsParser(cluster.getConfiguration(), args);
+ Configuration conf = opts.getConfiguration();
+ args = opts.getRemainingArgs();
+
+ fs = FileSystem.get(conf);
FSDataOutputStream op = fs.create(new Path(INPUT_FILE), true);
String line1 = "0001,0002\n";
String line2 = "0002,0004\n";
@@ -132,8 +162,7 @@ public class TestHBaseRootMarkerVertextFormat extends BspCase {
HTableDescriptor desc = new HTableDescriptor(TAB);
desc.addFamily(new HColumnDescriptor(FAM));
HBaseAdmin hbaseAdmin=new HBaseAdmin(conf);
- if (hbaseAdmin.isTableAvailable(TABLE_NAME))
- {
+ if (hbaseAdmin.isTableAvailable(TABLE_NAME)) {
hbaseAdmin.disableTable(TABLE_NAME);
hbaseAdmin.deleteTable(TABLE_NAME);
}
@@ -171,10 +200,17 @@ public class TestHBaseRootMarkerVertextFormat extends BspCase {
assertNotNull(parentBytes);
assertTrue(parentBytes.length > 0);
Assert.assertEquals("0001", Bytes.toString(parentBytes));
-
} finally {
- cluster.shutdown();
- zkCluster.shutdown();
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ if (zkCluster != null) {
+ zkCluster.shutdown();
+ }
+ // clean test files
+ if (fs != null) {
+ fs.delete(hbaseRootdir);
+ }
}
}
[5/5] git commit: Merge remote-tracking branch 'origin/trunk' into
perf
Posted by ni...@apache.org.
Merge remote-tracking branch 'origin/trunk' into perf
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/ed8b54ee
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/ed8b54ee
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/ed8b54ee
Branch: refs/heads/perf
Commit: ed8b54eee95c012c9ffd7bc464776b05846c6308
Parents: 370b42b a4d3330
Author: Nitay Joffe <ni...@apache.org>
Authored: Sun Mar 24 16:45:25 2013 -0400
Committer: Nitay Joffe <ni...@apache.org>
Committed: Sun Mar 24 16:45:25 2013 -0400
----------------------------------------------------------------------
CHANGELOG | 7 ++
.../java/org/apache/giraph/comm/ServerData.java | 4 +-
.../giraph/comm/aggregators/AggregatorUtils.java | 15 +--
.../comm/aggregators/AllAggregatorServerData.java | 12 ++-
.../aggregators/OwnerAggregatorServerData.java | 10 ++-
.../giraph/worker/WorkerAggregatorHandler.java | 3 +-
.../io/hbase/TestHBaseRootMarkerVertextFormat.java | 70 +++++++++++----
.../org/apache/giraph/hive/HiveGiraphRunner.java | 6 +-
8 files changed, 90 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/ed8b54ee/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
----------------------------------------------------------------------
[4/5] git commit: Hide impl types
Posted by ni...@apache.org.
Hide impl types
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/370b42b2
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/370b42b2
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/370b42b2
Branch: refs/heads/perf
Commit: 370b42b2551fb0dcfe4b8df141947ff5a8d1fa5b
Parents: 882545d
Author: Nitay Joffe <ni...@apache.org>
Authored: Sun Mar 24 15:20:07 2013 -0400
Committer: Nitay Joffe <ni...@apache.org>
Committed: Sun Mar 24 15:20:07 2013 -0400
----------------------------------------------------------------------
.../org/apache/giraph/hive/HiveGiraphRunner.java | 4 ++--
.../hive/input/edge/HiveEdgeInputFormat.java | 4 ++--
.../giraph/hive/input/edge/HiveEdgeReader.java | 11 ++++++-----
.../hive/input/vertex/HiveVertexInputFormat.java | 5 +++--
.../giraph/hive/input/vertex/HiveVertexReader.java | 14 ++++++++------
.../giraph/hive/output/HiveVertexWriter.java | 15 +++++++--------
.../apache/giraph/hive/output/VertexToHive.java | 2 +-
7 files changed, 29 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/370b42b2/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
index 8788a9d..2be09a8 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
@@ -275,13 +275,13 @@ public class HiveGiraphRunner implements Tool {
*/
private void setupHiveInputs(GiraphConfiguration conf) throws TException {
if (hiveToVertexClass != null) {
- HiveApiInputFormat.initProfile(conf, hiveVertexInputDescription,
+ HiveApiInputFormat.setProfileInputDesc(conf, hiveVertexInputDescription,
HiveProfiles.VERTEX_INPUT_PROFILE_ID);
conf.setVertexInputFormatClass(HiveVertexInputFormat.class);
}
if (hiveToEdgeClass != null) {
- HiveApiInputFormat.initProfile(conf, hiveEdgeInputDescription,
+ HiveApiInputFormat.setProfileInputDesc(conf, hiveEdgeInputDescription,
HiveProfiles.EDGE_INPUT_PROFILE_ID);
conf.setEdgeInputFormatClass(HiveEdgeInputFormat.class);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/370b42b2/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
index d8aa460..d5590c7 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import com.facebook.giraph.hive.input.HiveApiInputFormat;
-import com.facebook.giraph.hive.record.HiveRecord;
+import com.facebook.giraph.hive.record.HiveReadableRecord;
import java.io.IOException;
import java.util.List;
@@ -69,7 +69,7 @@ public class HiveEdgeInputFormat<I extends WritableComparable,
HiveEdgeReader<I, E> reader = new HiveEdgeReader<I, E>();
reader.setTableSchema(hiveInputFormat.getTableSchema(conf));
- RecordReader<WritableComparable, HiveRecord> baseReader;
+ RecordReader<WritableComparable, HiveReadableRecord> baseReader;
try {
baseReader = hiveInputFormat.createRecordReader(split, context);
} catch (InterruptedException e) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/370b42b2/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
index 65b9bbf..14de914 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import com.facebook.giraph.hive.record.HiveRecord;
+import com.facebook.giraph.hive.record.HiveReadableRecord;
import com.facebook.giraph.hive.schema.HiveTableSchema;
import com.facebook.giraph.hive.schema.HiveTableSchemas;
@@ -49,7 +49,7 @@ public class HiveEdgeReader<I extends WritableComparable, E extends Writable>
public static final String REUSE_EDGE_KEY = "giraph.hive.reuse.edge";
/** Underlying Hive RecordReader used */
- private RecordReader<WritableComparable, HiveRecord> hiveRecordReader;
+ private RecordReader<WritableComparable, HiveReadableRecord> hiveRecordReader;
/** Schema for table in Hive */
private HiveTableSchema tableSchema;
@@ -69,7 +69,8 @@ public class HiveEdgeReader<I extends WritableComparable, E extends Writable>
*
* @return RecordReader from Hive
*/
- public RecordReader<WritableComparable, HiveRecord> getHiveRecordReader() {
+ public RecordReader<WritableComparable, HiveReadableRecord>
+ getHiveRecordReader() {
return hiveRecordReader;
}
@@ -79,7 +80,7 @@ public class HiveEdgeReader<I extends WritableComparable, E extends Writable>
* @param hiveRecordReader RecordReader to read from Hive.
*/
public void setHiveRecordReader(
- RecordReader<WritableComparable, HiveRecord> hiveRecordReader) {
+ RecordReader<WritableComparable, HiveReadableRecord> hiveRecordReader) {
this.hiveRecordReader = hiveRecordReader;
}
@@ -159,7 +160,7 @@ public class HiveEdgeReader<I extends WritableComparable, E extends Writable>
@Override
public Edge<I, E> getCurrentEdge() throws IOException,
InterruptedException {
- HiveRecord record = hiveRecordReader.getCurrentValue();
+ HiveReadableRecord record = hiveRecordReader.getCurrentValue();
ReusableEdge<I, E> edge = edgeToReuse;
if (edge == null) {
edge = conf.createReusableEdge();
http://git-wip-us.apache.org/repos/asf/giraph/blob/370b42b2/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
index 25c7a26..35e8810 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
@@ -26,10 +26,11 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import com.facebook.giraph.hive.impl.input.HiveApiRecordReader;
import com.facebook.giraph.hive.input.HiveApiInputFormat;
+import com.facebook.giraph.hive.record.HiveReadableRecord;
import java.io.IOException;
import java.util.List;
@@ -70,7 +71,7 @@ public class HiveVertexInputFormat<I extends WritableComparable,
HiveVertexReader reader = new HiveVertexReader();
reader.setTableSchema(hiveInputFormat.getTableSchema(conf));
- HiveApiRecordReader baseReader;
+ RecordReader<WritableComparable, HiveReadableRecord> baseReader;
try {
baseReader = hiveInputFormat.createRecordReader(split, context);
} catch (InterruptedException e) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/370b42b2/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
index 11c57ad..c0425dd 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
@@ -26,10 +26,10 @@ import org.apache.giraph.utils.ReflectionUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import com.facebook.giraph.hive.impl.input.HiveApiRecordReader;
-import com.facebook.giraph.hive.record.HiveRecord;
+import com.facebook.giraph.hive.record.HiveReadableRecord;
import com.facebook.giraph.hive.schema.HiveTableSchema;
import com.facebook.giraph.hive.schema.HiveTableSchemaAware;
import com.facebook.giraph.hive.schema.HiveTableSchemas;
@@ -57,7 +57,7 @@ public class HiveVertexReader<I extends WritableComparable,
public static final String REUSE_VERTEX_KEY = "giraph.hive.reuse.vertex";
/** Underlying Hive RecordReader used */
- private HiveApiRecordReader hiveRecordReader;
+ private RecordReader<WritableComparable, HiveReadableRecord> hiveRecordReader;
/** Schema for table in Hive */
private HiveTableSchema tableSchema;
@@ -80,7 +80,8 @@ public class HiveVertexReader<I extends WritableComparable,
*
* @return RecordReader from Hive.
*/
- public HiveApiRecordReader getHiveRecordReader() {
+ public RecordReader<WritableComparable, HiveReadableRecord>
+ getHiveRecordReader() {
return hiveRecordReader;
}
@@ -89,7 +90,8 @@ public class HiveVertexReader<I extends WritableComparable,
*
* @param hiveRecordReader RecordReader to read from Hive.
*/
- public void setHiveRecordReader(HiveApiRecordReader hiveRecordReader) {
+ public void setHiveRecordReader(
+ RecordReader<WritableComparable, HiveReadableRecord> hiveRecordReader) {
this.hiveRecordReader = hiveRecordReader;
}
@@ -172,7 +174,7 @@ public class HiveVertexReader<I extends WritableComparable,
@Override
public final Vertex<I, V, E, M> getCurrentVertex()
throws IOException, InterruptedException {
- HiveRecord hiveRecord = hiveRecordReader.getCurrentValue();
+ HiveReadableRecord hiveRecord = hiveRecordReader.getCurrentValue();
Vertex<I, V, E, M> vertex = vertexToReuse;
if (vertex == null) {
vertex = conf.createVertex();
http://git-wip-us.apache.org/repos/asf/giraph/blob/370b42b2/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java
index 5356424..ffdf775 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java
@@ -29,13 +29,12 @@ import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
-import com.facebook.giraph.hive.impl.HiveApiRecord;
-import com.facebook.giraph.hive.record.HiveRecord;
+import com.facebook.giraph.hive.record.HiveRecordFactory;
+import com.facebook.giraph.hive.record.HiveWritableRecord;
import com.facebook.giraph.hive.schema.HiveTableSchema;
import com.facebook.giraph.hive.schema.HiveTableSchemas;
import java.io.IOException;
-import java.util.Collections;
/**
* Vertex writer using Hive.
@@ -53,7 +52,7 @@ public class HiveVertexWriter<I extends WritableComparable, V extends Writable,
private static final Logger LOG = Logger.getLogger(HiveVertexWriter.class);
/** Underlying Hive RecordWriter used */
- private RecordWriter<WritableComparable, HiveRecord> hiveRecordWriter;
+ private RecordWriter<WritableComparable, HiveWritableRecord> hiveRecordWriter;
/** Schema for table in Hive */
private HiveTableSchema tableSchema;
@@ -68,7 +67,7 @@ public class HiveVertexWriter<I extends WritableComparable, V extends Writable,
*
* @return RecordWriter for Hive.
*/
- public RecordWriter<WritableComparable, HiveRecord> getBaseWriter() {
+ public RecordWriter<WritableComparable, HiveWritableRecord> getBaseWriter() {
return hiveRecordWriter;
}
@@ -78,7 +77,7 @@ public class HiveVertexWriter<I extends WritableComparable, V extends Writable,
* @param hiveRecordWriter RecordWriter to write to Hive.
*/
public void setBaseWriter(
- RecordWriter<WritableComparable, HiveRecord> hiveRecordWriter) {
+ RecordWriter<WritableComparable, HiveWritableRecord> hiveRecordWriter) {
this.hiveRecordWriter = hiveRecordWriter;
}
@@ -125,8 +124,8 @@ public class HiveVertexWriter<I extends WritableComparable, V extends Writable,
@Override
public void writeVertex(Vertex<I, V, E, ?> vertex)
throws IOException, InterruptedException {
- HiveRecord record = new HiveApiRecord(tableSchema.numColumns(),
- Collections.<String>emptyList());
+ HiveWritableRecord record =
+ HiveRecordFactory.newWritableRecord(tableSchema.numColumns());
vertexToHive.fillRecord(vertex, record);
hiveRecordWriter.write(NullWritable.get(), record);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/370b42b2/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java
index 6d323bd..51649a5 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java
@@ -22,7 +22,7 @@ import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import com.facebook.giraph.hive.HiveWritableRecord;
+import com.facebook.giraph.hive.record.HiveWritableRecord;
/**
* Interface for writing vertices to a Hive record.
[3/5] git commit: GIRAPH-580: NPE in HiveGiraphRunner when the vertex
output format is not defined (aching)
Posted by ni...@apache.org.
GIRAPH-580: NPE in HiveGiraphRunner when the vertex output format is
not defined (aching)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/a4d33303
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/a4d33303
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/a4d33303
Branch: refs/heads/perf
Commit: a4d33303969c060e02b9bc76b90c3ba06e9ba9e3
Parents: 1566827
Author: Avery Ching <ac...@fb.com>
Authored: Fri Mar 22 09:45:40 2013 -0700
Committer: Avery Ching <ac...@fb.com>
Committed: Fri Mar 22 09:56:33 2013 -0700
----------------------------------------------------------------------
CHANGELOG | 3 +++
.../org/apache/giraph/hive/HiveGiraphRunner.java | 6 ++++--
2 files changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/a4d33303/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index db8f4ce..50ed932 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,9 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-580: NPE in HiveGiraphRunner when the vertex output format is
+ not defined (aching)
+
GIRAPH-510: Remove HBase Cruft (kelarini via nitay)
GIRAPH-566: Make option for aggregators to be configurable (majakabiljo)
http://git-wip-us.apache.org/repos/asf/giraph/blob/a4d33303/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
index 8788a9d..efc08d3 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
@@ -680,8 +680,10 @@ public class HiveGiraphRunner implements Tool {
LOG.info(LOG_PREFIX + "-outputPartition=\"" +
hiveOutputDescription.getPartitionValues() + "\"");
}
- LOG.info(LOG_PREFIX + "-outputFormatClass=" +
- classes.getVertexOutputFormatClass().getCanonicalName());
+ if (classes.getVertexOutputFormatClass() != null) {
+ LOG.info(LOG_PREFIX + "-outputFormatClass=" +
+ classes.getVertexOutputFormatClass().getCanonicalName());
+ }
LOG.info(LOG_PREFIX + "-workers=" + workers);
}