You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/02/22 05:03:28 UTC
[1/2] GIRAPH-453: Pure Hive I/O (nitay)
http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/package-info.java
new file mode 100644
index 0000000..092ea39
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Hive Edge input related things.
+ */
+package org.apache.giraph.hive.input.edge;
http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/java/org/apache/giraph/hive/input/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/package-info.java
new file mode 100644
index 0000000..7f4e5d6
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Hive input things.
+ */
+package org.apache.giraph.hive.input;
http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java
new file mode 100644
index 0000000..31f0e64
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.vertex;
+
+import org.apache.giraph.vertex.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.facebook.giraph.hive.HiveReadableRecord;
+import com.facebook.giraph.hive.HiveTableSchemaAware;
+
+/**
+ * Interface for creating vertices from a Hive record.
+ * Also used for reading vertex values.
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ */
+public interface HiveToVertex<I extends WritableComparable,
+ V extends Writable, E extends Writable> extends HiveTableSchemaAware {
+ /**
+ * Fill the Vertex from the HiveRecord given.
+ *
+ * @param record HiveRecord to read from.
+ * @param vertex Vertex to fill.
+ */
+ void fillVertex(HiveReadableRecord record, Vertex<I, V, E, ?> vertex);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
new file mode 100644
index 0000000..1d43055
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.vertex;
+
+import org.apache.giraph.hive.common.HiveProfiles;
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.facebook.giraph.hive.HiveRecord;
+import com.facebook.giraph.hive.input.HiveApiInputFormat;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * {@link VertexInputFormat} for reading vertices from Hive.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public class HiveVertexInputFormat<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ extends VertexInputFormat<I, V, E, M> {
+ /** Underlying Hive InputFormat used */
+ private final HiveApiInputFormat hiveInputFormat;
+
+ /**
+ * Create vertex input format
+ */
+ public HiveVertexInputFormat() {
+ hiveInputFormat = new HiveApiInputFormat();
+ hiveInputFormat.setMyProfileId(HiveProfiles.VERTEX_INPUT_PROFILE_ID);
+ }
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context, int numWorkers)
+ throws IOException, InterruptedException {
+ return hiveInputFormat.getSplits(context);
+ }
+
+ @Override
+ public VertexReader<I, V, E, M> createVertexReader(InputSplit split,
+ TaskAttemptContext context) throws IOException {
+ Configuration conf = context.getConfiguration();
+
+ RecordReader<WritableComparable, HiveRecord> baseReader;
+ HiveVertexReader reader = new HiveVertexReader();
+ reader.setTableSchema(hiveInputFormat.getTableSchema(conf));
+
+ try {
+ baseReader = hiveInputFormat.createRecordReader(split, context);
+ reader.setHiveRecordReader(baseReader);
+ reader.initialize(split, context);
+ } catch (InterruptedException e) {
+ throw new IOException("Could not create vertex reader", e);
+ }
+ return reader;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
new file mode 100644
index 0000000..c5974de
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.vertex;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.VertexReader;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.facebook.giraph.hive.HiveRecord;
+import com.facebook.giraph.hive.HiveTableSchema;
+import com.facebook.giraph.hive.HiveTableSchemaAware;
+
+import java.io.IOException;
+
+/**
+ * VertexReader using Hive
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ * @param <M> Message Value
+ */
+public class HiveVertexReader<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ implements VertexReader<I, V, E, M>, HiveTableSchemaAware {
+ /** Key in Configuration for HiveToVertex class */
+ public static final String HIVE_TO_VERTEX_KEY = "giraph.hive.to.vertex.class";
+
+ /** Underlying Hive RecordReader used */
+ private RecordReader<WritableComparable, HiveRecord> hiveRecordReader;
+ /** Schema for table in Hive */
+ private HiveTableSchema tableSchema;
+
+ /** Configuration */
+ private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+
+ /** User class to create vertices from a HiveRecord */
+ private HiveToVertex<I, V, E> hiveToVertex;
+
+ /**
+ * Get underlying Hive record reader used.
+ *
+ * @return RecordReader from Hive.
+ */
+ public RecordReader<WritableComparable, HiveRecord> getHiveRecordReader() {
+ return hiveRecordReader;
+ }
+
+ /**
+ * Set underlying Hive record reader used.
+ *
+ * @param hiveRecordReader RecordReader to read from Hive.
+ */
+ public void setHiveRecordReader(
+ RecordReader<WritableComparable, HiveRecord> hiveRecordReader) {
+ this.hiveRecordReader = hiveRecordReader;
+ }
+
+ @Override
+ public HiveTableSchema getTableSchema() {
+ return tableSchema;
+ }
+
+ @Override
+ public void setTableSchema(HiveTableSchema tableSchema) {
+ this.tableSchema = tableSchema;
+ }
+
+ /**
+ * Get our Configuration.
+ *
+ * @return ImmutableClassesGiraphConfiguration
+ */
+ public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
+ return conf;
+ }
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ hiveRecordReader.initialize(inputSplit, context);
+ conf = new ImmutableClassesGiraphConfiguration(context.getConfiguration());
+ instantiateHiveToVertexFromConf();
+ }
+
+ /**
+ * Retrieve the user's HiveVertexCreator from our configuration.
+ *
+ * @throws IOException if anything goes wrong reading from Configuration.
+ */
+ private void instantiateHiveToVertexFromConf() throws IOException {
+ Class<? extends HiveToVertex> klass = conf.getClass(HIVE_TO_VERTEX_KEY,
+ null, HiveToVertex.class);
+ if (klass == null) {
+ throw new IOException(HIVE_TO_VERTEX_KEY + " not set in conf");
+ }
+ hiveToVertex = ReflectionUtils.newInstance(klass, conf);
+ hiveToVertex.setTableSchema(tableSchema);
+ }
+
+ @Override
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return hiveRecordReader.nextKeyValue();
+ }
+
+ @Override
+ public void close() throws IOException {
+ hiveRecordReader.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return hiveRecordReader.getProgress();
+ }
+
+ @Override
+ public final Vertex<I, V, E, M> getCurrentVertex()
+ throws IOException, InterruptedException {
+ HiveRecord hiveRecord = hiveRecordReader.getCurrentValue();
+ Vertex vertex = conf.createVertex();
+ hiveToVertex.fillVertex(hiveRecord, vertex);
+ return vertex;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/package-info.java
new file mode 100644
index 0000000..9027962
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Hive vertex input related things.
+ */
+package org.apache.giraph.hive.input.vertex;
http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java
new file mode 100644
index 0000000..641a298
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.output;
+
+import org.apache.giraph.hive.common.HiveProfiles;
+import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.io.VertexWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.facebook.giraph.hive.HiveRecord;
+import com.facebook.giraph.hive.output.HiveApiOutputFormat;
+
+import java.io.IOException;
+
+/**
+ * VertexOutputFormat using Hive
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ */
+public class HiveVertexOutputFormat<I extends WritableComparable,
+ V extends Writable, E extends Writable>
+ extends VertexOutputFormat<I, V, E> {
+ /** Underlying Hive OutputFormat used */
+ private final HiveApiOutputFormat hiveOutputFormat;
+
+ /**
+ * Create vertex output format
+ */
+ public HiveVertexOutputFormat() {
+ hiveOutputFormat = new HiveApiOutputFormat();
+ hiveOutputFormat.setMyProfileId(HiveProfiles.VERTEX_OUTPUT_PROFILE_ID);
+ }
+
+ @Override
+ public VertexWriter<I, V, E> createVertexWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+
+ RecordWriter<WritableComparable, HiveRecord> baseWriter =
+ hiveOutputFormat.getRecordWriter(context);
+ HiveVertexWriter writer = new HiveVertexWriter();
+ writer.setBaseWriter(baseWriter);
+ writer.setTableSchema(hiveOutputFormat.getTableSchema(conf));
+ writer.initialize(context);
+ return writer;
+ }
+
+ @Override
+ public void checkOutputSpecs(JobContext context)
+ throws IOException, InterruptedException {
+ hiveOutputFormat.checkOutputSpecs(context);
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return hiveOutputFormat.getOutputCommitter(context);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java
new file mode 100644
index 0000000..ea24fc5
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.output;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.VertexWriter;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Logger;
+
+import com.facebook.giraph.hive.HiveRecord;
+import com.facebook.giraph.hive.HiveTableSchema;
+import com.facebook.giraph.hive.impl.HiveApiRecord;
+
+import java.io.IOException;
+
+/**
+ * Vertex writer using Hive.
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ */
+public class HiveVertexWriter<I extends WritableComparable, V extends Writable,
+ E extends Writable> implements VertexWriter<I, V, E> {
+ /** Key in configuration for VertexToHive class */
+ public static final String VERTEX_TO_HIVE_KEY = "giraph.vertex.to.hive.class";
+
+ /** Logger */
+ private static final Logger LOG = Logger.getLogger(HiveVertexWriter.class);
+
+ /** Underlying Hive RecordWriter used */
+ private RecordWriter<WritableComparable, HiveRecord> hiveRecordWriter;
+ /** Schema for table in Hive */
+ private HiveTableSchema tableSchema;
+
+ /** Configuration */
+ private ImmutableClassesGiraphConfiguration<I, V, E, ?> conf;
+
+ /** User class to write vertices from a HiveRecord */
+ private VertexToHive<I, V, E> vertexToHive;
+
+ /**
+ * Get underlying Hive record writer used.
+ *
+ * @return RecordWriter for Hive.
+ */
+ public RecordWriter<WritableComparable, HiveRecord> getBaseWriter() {
+ return hiveRecordWriter;
+ }
+
+ /**
+ * Set underlying Hive record writer used.
+ *
+ * @param hiveRecordWriter RecordWriter to write to Hive.
+ */
+ public void setBaseWriter(
+ RecordWriter<WritableComparable, HiveRecord> hiveRecordWriter) {
+ this.hiveRecordWriter = hiveRecordWriter;
+ }
+
+ /**
+ * Get Hive table schema for table being read from.
+ *
+ * @return Hive table schema for table
+ */
+ public HiveTableSchema getTableSchema() {
+ return tableSchema;
+ }
+
+ /**
+ * Set Hive schema for table being read from.
+ *
+ * @param tableSchema Hive table schema
+ */
+ public void setTableSchema(HiveTableSchema tableSchema) {
+ this.tableSchema = tableSchema;
+ }
+
+ @Override
+ public void initialize(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ conf = new ImmutableClassesGiraphConfiguration<I, V, E, Writable>(
+ context.getConfiguration());
+ instantiateVertexToHiveFromConf();
+ }
+
+ /**
+ * Initialize VertexToHive instance from our configuration.
+ * @throws IOException errors instantiating
+ */
+ private void instantiateVertexToHiveFromConf() throws IOException {
+ Class<? extends VertexToHive> klass = conf.getClass(VERTEX_TO_HIVE_KEY,
+ null, VertexToHive.class);
+ if (klass == null) {
+ throw new IOException(VERTEX_TO_HIVE_KEY + " not set in conf");
+ }
+ vertexToHive = ReflectionUtils.newInstance(klass, conf);
+ vertexToHive.setTableSchema(tableSchema);
+ }
+
+ @Override
+ public void writeVertex(Vertex<I, V, E, ?> vertex)
+ throws IOException, InterruptedException {
+ HiveRecord record = new HiveApiRecord(tableSchema.numColumns());
+ vertexToHive.fillRecord(vertex, record);
+ hiveRecordWriter.write(NullWritable.get(), record);
+ }
+
+ @Override
+ public void close(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ hiveRecordWriter.close(context);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java
new file mode 100644
index 0000000..a72cf0b
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.output;
+
+import org.apache.giraph.vertex.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.facebook.giraph.hive.HiveTableSchemaAware;
+import com.facebook.giraph.hive.HiveWritableRecord;
+
+/**
+ * Interface for writing vertices to a Hive record.
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ */
+public interface VertexToHive<I extends WritableComparable, V extends Writable,
+ E extends Writable> extends HiveTableSchemaAware {
+ /**
+ * Fill the HiveRecord from the Vertex given.
+ *
+ * @param vertex Vertex to read from.
+ * @param record HiveRecord to write to.
+ */
+ void fillRecord(Vertex<I, V, E, ?> vertex, HiveWritableRecord record);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/java/org/apache/giraph/hive/output/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/package-info.java
new file mode 100644
index 0000000..65d87e3
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Hive output things.
+ */
+package org.apache.giraph.hive.output;
http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/giraph-hive/src/main/java/org/apache/giraph/hive/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/package-info.java
new file mode 100644
index 0000000..d828d2a
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of Giraph configuration related things.
+ */
+package org.apache.giraph.hive;
http://git-wip-us.apache.org/repos/asf/giraph/blob/92517430/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c075762..4be0bd0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -235,7 +235,9 @@ under the License.
<buildtype>test</buildtype>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hbase.version>0.90.5</hbase.version>
- <jackson.version>1.8.0</jackson.version>
+ <hive.version>0.9.0</hive.version>
+ <codehaus-jackson.version>1.8.0</codehaus-jackson.version>
+ <fasterxml-jackson.version>2.1.0</fasterxml-jackson.version>
<slf4j.version>1.7.2</slf4j.version>
<hive.version>0.10.0</hive.version>
<forHadoop>for-hadoop-${hadoop.version}</forHadoop>
@@ -836,6 +838,11 @@ under the License.
<dependencies>
<!-- compile dependencies. sorted lexicographically. -->
<dependency>
+ <groupId>com.facebook.giraph.hive</groupId>
+ <artifactId>hive-io-experimental</artifactId>
+ <version>0.1</version>
+ </dependency>
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>12.0</version>
@@ -843,7 +850,7 @@ under the License.
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-core</artifactId>
- <version>2.1.2</version>
+ <version>2.2.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
@@ -852,6 +859,21 @@ under the License.
</exclusions>
</dependency>
<dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ <version>${fasterxml-jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${fasterxml-jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.github.spullara.cli-parser</groupId>
+ <artifactId>cli-parser</artifactId>
+ <version>1.1</version>
+ </dependency>
+ <dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>1.3.2</version>
@@ -877,6 +899,16 @@ under the License.
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.giraph</groupId>
+ <artifactId>giraph-hcatalog</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.giraph</groupId>
+ <artifactId>giraph-hive</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hcatalog</groupId>
<artifactId>hcatalog-core</artifactId>
<version>0.5.0-incubating</version>
@@ -929,12 +961,12 @@ under the License.
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
- <version>${jackson.version}</version>
+ <version>${codehaus-jackson.version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
- <version>${jackson.version}</version>
+ <version>${codehaus-jackson.version}</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
@@ -1030,6 +1062,12 @@ under the License.
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.giraph</groupId>
+ <artifactId>giraph-core</artifactId>
+ <type>test-jar</type>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<type>test-jar</type>
@@ -1047,6 +1085,7 @@ under the License.
<modules>
<module>giraph-core</module>
+ <module>giraph-hive</module>
<module>giraph-examples</module>
</modules>