You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/01/02 20:04:01 UTC
[3/4] GIRAPH-458: split formats module into accumulo, hbase,
hcatalog (nitay)
http://git-wip-us.apache.org/repos/asf/giraph/blob/57ea5561/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java b/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java
deleted file mode 100644
index 2e91cba..0000000
--- a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java
+++ /dev/null
@@ -1,427 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io.hcatalog;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hcatalog.common.HCatConstants;
-import org.apache.hcatalog.common.HCatUtil;
-import org.apache.hcatalog.data.HCatRecord;
-import org.apache.hcatalog.data.schema.HCatFieldSchema;
-import org.apache.hcatalog.data.schema.HCatSchema;
-import org.apache.hcatalog.mapreduce.HCatBaseInputFormat;
-import org.apache.hcatalog.mapreduce.HCatSplit;
-import org.apache.hcatalog.mapreduce.HCatStorageHandler;
-import org.apache.hcatalog.mapreduce.HCatUtils;
-import org.apache.hcatalog.mapreduce.InputJobInfo;
-import org.apache.hcatalog.mapreduce.PartInfo;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Provides functionality similar to
- * {@link org.apache.hcatalog.mapreduce.HCatInputFormat},
- * but allows for different data sources (vertex and edge data).
- */
-public class GiraphHCatInputFormat extends HCatBaseInputFormat {
- /** Vertex input job info for HCatalog. */
- public static final String VERTEX_INPUT_JOB_INFO =
- "giraph.hcat.vertex.input.job.info";
- /** Edge input job info for HCatalog. */
- public static final String EDGE_INPUT_JOB_INFO =
- "giraph.hcat.edge.input.job.info";
-
- /**
- * Set vertex {@link InputJobInfo}.
- *
- * @param job The job
- * @param inputJobInfo Vertex input job info
- * @throws IOException
- */
- public static void setVertexInput(Job job,
- InputJobInfo inputJobInfo)
- throws IOException {
- InputJobInfo vertexInputJobInfo = InputJobInfo.create(
- inputJobInfo.getDatabaseName(),
- inputJobInfo.getTableName(),
- inputJobInfo.getFilter());
- vertexInputJobInfo.getProperties().putAll(inputJobInfo.getProperties());
- Configuration conf = job.getConfiguration();
- conf.set(VERTEX_INPUT_JOB_INFO, HCatUtil.serialize(
- HCatUtils.getInputJobInfo(conf, vertexInputJobInfo)));
- }
-
- /**
- * Set edge {@link InputJobInfo}.
- *
- * @param job The job
- * @param inputJobInfo Edge input job info
- * @throws IOException
- */
- public static void setEdgeInput(Job job,
- InputJobInfo inputJobInfo)
- throws IOException {
- InputJobInfo edgeInputJobInfo = InputJobInfo.create(
- inputJobInfo.getDatabaseName(),
- inputJobInfo.getTableName(),
- inputJobInfo.getFilter());
- edgeInputJobInfo.getProperties().putAll(inputJobInfo.getProperties());
- Configuration conf = job.getConfiguration();
- conf.set(EDGE_INPUT_JOB_INFO, HCatUtil.serialize(
- HCatUtils.getInputJobInfo(conf, edgeInputJobInfo)));
- }
-
- /**
- * Get table schema from input job info.
- *
- * @param inputJobInfo Input job info
- * @return Input table schema
- * @throws IOException
- */
- private static HCatSchema getTableSchema(InputJobInfo inputJobInfo)
- throws IOException {
- HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>());
- for (HCatFieldSchema field :
- inputJobInfo.getTableInfo().getDataColumns().getFields()) {
- allCols.append(field);
- }
- for (HCatFieldSchema field :
- inputJobInfo.getTableInfo().getPartitionColumns().getFields()) {
- allCols.append(field);
- }
- return allCols;
- }
-
- /**
- * Get vertex input table schema.
- *
- * @param conf Job configuration
- * @return Vertex input table schema
- * @throws IOException
- */
- public static HCatSchema getVertexTableSchema(Configuration conf)
- throws IOException {
- return getTableSchema(getVertexJobInfo(conf));
- }
-
- /**
- * Get edge input table schema.
- *
- * @param conf Job configuration
- * @return Edge input table schema
- * @throws IOException
- */
- public static HCatSchema getEdgeTableSchema(Configuration conf)
- throws IOException {
- return getTableSchema(getEdgeJobInfo(conf));
- }
-
- /**
- * Set input path for job.
- *
- * @param jobConf Job configuration
- * @param location Location of input files
- * @throws IOException
- */
- private void setInputPath(JobConf jobConf, String location)
- throws IOException {
- int length = location.length();
- int curlyOpen = 0;
- int pathStart = 0;
- boolean globPattern = false;
- List<String> pathStrings = new ArrayList<String>();
-
- for (int i = 0; i < length; i++) {
- char ch = location.charAt(i);
- switch (ch) {
- case '{':
- curlyOpen++;
- if (!globPattern) {
- globPattern = true;
- }
- break;
- case '}':
- curlyOpen--;
- if (curlyOpen == 0 && globPattern) {
- globPattern = false;
- }
- break;
- case ',':
- if (!globPattern) {
- pathStrings.add(location.substring(pathStart, i));
- pathStart = i + 1;
- }
- break;
- default:
- }
- }
- pathStrings.add(location.substring(pathStart, length));
-
- Path[] paths = StringUtils.stringToPath(pathStrings.toArray(new String[0]));
-
- FileSystem fs = FileSystem.get(jobConf);
- Path path = paths[0].makeQualified(fs);
- StringBuilder str = new StringBuilder(StringUtils.escapeString(
- path.toString()));
- for (int i = 1; i < paths.length; i++) {
- str.append(StringUtils.COMMA_STR);
- path = paths[i].makeQualified(fs);
- str.append(StringUtils.escapeString(path.toString()));
- }
-
- jobConf.set("mapred.input.dir", str.toString());
- }
-
- /**
- * Get input splits for job.
- *
- * @param jobContext Job context
- * @param inputJobInfo Input job info
- * @return MapReduce setting for file input directory
- * @throws IOException
- * @throws InterruptedException
- */
- private List<InputSplit> getSplits(JobContext jobContext,
- InputJobInfo inputJobInfo)
- throws IOException, InterruptedException {
- Configuration conf = jobContext.getConfiguration();
-
- List<InputSplit> splits = new ArrayList<InputSplit>();
- List<PartInfo> partitionInfoList = inputJobInfo.getPartitions();
- if (partitionInfoList == null) {
- //No partitions match the specified partition filter
- return splits;
- }
-
- HCatStorageHandler storageHandler;
- JobConf jobConf;
- //For each matching partition, call getSplits on the underlying InputFormat
- for (PartInfo partitionInfo : partitionInfoList) {
- jobConf = HCatUtil.getJobConfFromContext(jobContext);
- setInputPath(jobConf, partitionInfo.getLocation());
- Map<String, String> jobProperties = partitionInfo.getJobProperties();
-
- HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>());
- for (HCatFieldSchema field :
- inputJobInfo.getTableInfo().getDataColumns().getFields()) {
- allCols.append(field);
- }
- for (HCatFieldSchema field :
- inputJobInfo.getTableInfo().getPartitionColumns().getFields()) {
- allCols.append(field);
- }
-
- HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
-
- storageHandler = HCatUtil.getStorageHandler(
- jobConf, partitionInfo);
-
- //Get the input format
- Class inputFormatClass = storageHandler.getInputFormatClass();
- org.apache.hadoop.mapred.InputFormat inputFormat =
- getMapRedInputFormat(jobConf, inputFormatClass);
-
- //Call getSplit on the InputFormat, create an HCatSplit for each
- //underlying split. When the desired number of input splits is missing,
- //use a default number (denoted by zero).
- //TODO: Currently each partition is split independently into
- //a desired number. However, we want the union of all partitions to be
- //split into a desired number while maintaining balanced sizes of input
- //splits.
- int desiredNumSplits =
- conf.getInt(HCatConstants.HCAT_DESIRED_PARTITION_NUM_SPLITS, 0);
- org.apache.hadoop.mapred.InputSplit[] baseSplits =
- inputFormat.getSplits(jobConf, desiredNumSplits);
-
- for (org.apache.hadoop.mapred.InputSplit split : baseSplits) {
- splits.add(new HCatSplit(partitionInfo, split, allCols));
- }
- }
-
- return splits;
- }
-
- /**
- * Get vertex {@link InputJobInfo}.
- *
- * @param conf Configuration
- * @return Vertex input job info
- * @throws IOException
- */
- private static InputJobInfo getVertexJobInfo(Configuration conf)
- throws IOException {
- String jobString = conf.get(VERTEX_INPUT_JOB_INFO);
- if (jobString == null) {
- throw new IOException("Vertex job information not found in JobContext." +
- " GiraphHCatInputFormat.setVertexInput() not called?");
- }
- return (InputJobInfo) HCatUtil.deserialize(jobString);
- }
-
- /**
- * Get edge {@link InputJobInfo}.
- *
- * @param conf Configuration
- * @return Edge input job info
- * @throws IOException
- */
- private static InputJobInfo getEdgeJobInfo(Configuration conf)
- throws IOException {
- String jobString = conf.get(EDGE_INPUT_JOB_INFO);
- if (jobString == null) {
- throw new IOException("Edge job information not found in JobContext." +
- " GiraphHCatInputFormat.setEdgeInput() not called?");
- }
- return (InputJobInfo) HCatUtil.deserialize(jobString);
- }
-
- /**
- * Get vertex input splits.
- *
- * @param jobContext Job context
- * @return List of vertex {@link InputSplit}s
- * @throws IOException
- * @throws InterruptedException
- */
- public List<InputSplit> getVertexSplits(JobContext jobContext)
- throws IOException, InterruptedException {
- return getSplits(jobContext,
- getVertexJobInfo(jobContext.getConfiguration()));
- }
-
- /**
- * Get edge input splits.
- *
- * @param jobContext Job context
- * @return List of edge {@link InputSplit}s
- * @throws IOException
- * @throws InterruptedException
- */
- public List<InputSplit> getEdgeSplits(JobContext jobContext)
- throws IOException, InterruptedException {
- return getSplits(jobContext,
- getEdgeJobInfo(jobContext.getConfiguration()));
- }
-
- /**
- * Create an {@link org.apache.hcatalog.mapreduce.HCatRecordReader}.
- *
- * @param split Input split
- * @param schema Table schema
- * @param taskContext Context
- * @return Record reader
- * @throws IOException
- * @throws InterruptedException
- */
- private RecordReader<WritableComparable, HCatRecord>
- createRecordReader(InputSplit split,
- HCatSchema schema,
- TaskAttemptContext taskContext)
- throws IOException, InterruptedException {
- HCatSplit hcatSplit = HCatUtils.castToHCatSplit(split);
- PartInfo partitionInfo = hcatSplit.getPartitionInfo();
- JobContext jobContext = taskContext;
- Configuration conf = jobContext.getConfiguration();
-
- HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(
- conf, partitionInfo);
-
- JobConf jobConf = HCatUtil.getJobConfFromContext(jobContext);
- Map<String, String> jobProperties = partitionInfo.getJobProperties();
- HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
-
- Map<String, String> valuesNotInDataCols = getColValsNotInDataColumns(
- schema, partitionInfo);
-
- return HCatUtils.newHCatReader(storageHandler, valuesNotInDataCols);
- }
-
- /**
- * Create a {@link RecordReader} for vertices.
- *
- * @param split Input split
- * @param taskContext Context
- * @return Record reader
- * @throws IOException
- * @throws InterruptedException
- */
- public RecordReader<WritableComparable, HCatRecord>
- createVertexRecordReader(InputSplit split, TaskAttemptContext taskContext)
- throws IOException, InterruptedException {
- return createRecordReader(split, getVertexTableSchema(
- taskContext.getConfiguration()), taskContext);
- }
-
- /**
- * Create a {@link RecordReader} for edges.
- *
- * @param split Input split
- * @param taskContext Context
- * @return Record reader
- * @throws IOException
- * @throws InterruptedException
- */
- public RecordReader<WritableComparable, HCatRecord>
- createEdgeRecordReader(InputSplit split, TaskAttemptContext taskContext)
- throws IOException, InterruptedException {
- return createRecordReader(split, getEdgeTableSchema(
- taskContext.getConfiguration()), taskContext);
- }
-
- /**
- * Get values for fields requested by output schema which will not be in the
- * data.
- *
- * @param outputSchema Output schema
- * @param partInfo Partition info
- * @return Values not in data columns
- */
- private static Map<String, String> getColValsNotInDataColumns(
- HCatSchema outputSchema,
- PartInfo partInfo) {
- HCatSchema dataSchema = partInfo.getPartitionSchema();
- Map<String, String> vals = new HashMap<String, String>();
- for (String fieldName : outputSchema.getFieldNames()) {
- if (dataSchema.getPosition(fieldName) == null) {
- // this entry of output is not present in the output schema
- // so, we first check the table schema to see if it is a part col
- if (partInfo.getPartitionValues().containsKey(fieldName)) {
- vals.put(fieldName, partInfo.getPartitionValues().get(fieldName));
- } else {
- vals.put(fieldName, null);
- }
- }
- }
- return vals;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/57ea5561/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java b/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java
deleted file mode 100644
index 2112df3..0000000
--- a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io.hcatalog;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.EdgeInputFormat;
-import org.apache.giraph.graph.EdgeReader;
-import org.apache.giraph.graph.EdgeWithSource;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hcatalog.data.HCatRecord;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * HCatalog {@link EdgeInputFormat} for reading edges from Hive/Pig.
- *
- * @param <I> Vertex id
- * @param <E> Edge value
- */
-public abstract class HCatalogEdgeInputFormat<
- I extends WritableComparable,
- E extends Writable>
- extends EdgeInputFormat<I, E> {
- /**
- * HCatalog input format.
- */
- private GiraphHCatInputFormat hCatInputFormat = new GiraphHCatInputFormat();
-
- @Override
- public final List<InputSplit> getSplits(JobContext context, int numWorkers)
- throws IOException, InterruptedException {
- return hCatInputFormat.getEdgeSplits(context);
- }
-
- /**
- * {@link EdgeReader} for {@link HCatalogEdgeInputFormat}.
- */
- protected abstract class HCatalogEdgeReader implements EdgeReader<I, E> {
- /** Internal {@link RecordReader}. */
- private RecordReader<WritableComparable, HCatRecord> hCatRecordReader;
- /** Context passed to initialize. */
- private TaskAttemptContext context;
-
- @Override
- public final void initialize(InputSplit inputSplit,
- TaskAttemptContext context)
- throws IOException, InterruptedException {
- hCatRecordReader =
- hCatInputFormat.createEdgeRecordReader(inputSplit, context);
- hCatRecordReader.initialize(inputSplit, context);
- this.context = context;
- }
-
- @Override
- public boolean nextEdge() throws IOException, InterruptedException {
- return hCatRecordReader.nextKeyValue();
- }
-
- @Override
- public final void close() throws IOException {
- hCatRecordReader.close();
- }
-
- @Override
- public final float getProgress() throws IOException, InterruptedException {
- return hCatRecordReader.getProgress();
- }
-
- /**
- * Get the record reader.
- *
- * @return Record reader to be used for reading.
- */
- protected final RecordReader<WritableComparable, HCatRecord>
- getRecordReader() {
- return hCatRecordReader;
- }
-
- /**
- * Get the context.
- *
- * @return Context passed to initialize.
- */
- protected final TaskAttemptContext getContext() {
- return context;
- }
- }
-
- /**
- * Create {@link EdgeReader}.
-
- * @return {@link HCatalogEdgeReader} instance.
- */
- protected abstract HCatalogEdgeReader createEdgeReader();
-
- @Override
- public EdgeReader<I, E>
- createEdgeReader(InputSplit split, TaskAttemptContext context)
- throws IOException {
- try {
- HCatalogEdgeReader reader = createEdgeReader();
- reader.initialize(split, context);
- return reader;
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "createEdgeReader: Interrupted creating reader.", e);
- }
- }
-
- /**
- * {@link HCatalogEdgeReader} for tables holding a complete edge
- * in each row.
- */
- protected abstract class SingleRowHCatalogEdgeReader
- extends HCatalogEdgeReader {
- /**
- * Get source vertex id from a record.
- *
- * @param record Input record
- * @return I Source vertex id
- */
- protected abstract I getSourceVertexId(HCatRecord record);
-
- /**
- * Get target vertex id from a record.
- *
- * @param record Input record
- * @return I Target vertex id
- */
- protected abstract I getTargetVertexId(HCatRecord record);
-
- /**
- * Get edge value from a record.
- *
- * @param record Input record
- * @return E Edge value
- */
- protected abstract E getEdgeValue(HCatRecord record);
-
- @Override
- public EdgeWithSource<I, E> getCurrentEdge() throws IOException,
- InterruptedException {
- HCatRecord record = getRecordReader().getCurrentValue();
- return new EdgeWithSource<I, E>(
- getSourceVertexId(record),
- new Edge<I, E>(getTargetVertexId(record), getEdgeValue(record)));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/57ea5561/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java b/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java
deleted file mode 100644
index ec49137..0000000
--- a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java
+++ /dev/null
@@ -1,391 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io.hcatalog;
-
-import com.google.common.collect.Lists;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexInputFormat;
-import org.apache.giraph.graph.VertexReader;
-import org.apache.giraph.utils.TimedLogger;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hcatalog.data.HCatRecord;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Abstract class that users should subclass to load data from a Hive or Pig
- * table. You can easily implement a {@link HCatalogVertexReader} by extending
- * either {@link SingleRowHCatalogVertexReader} or
- * {@link MultiRowHCatalogVertexReader} depending on how data for each vertex is
- * stored in the input table.
- * <p>
- * The desired database and table name to load from can be specified via
- * {@link GiraphHCatInputFormat#setVertexInput(org.apache.hadoop.mapreduce.Job,
- * org.apache.hcatalog.mapreduce.InputJobInfo)}
- * as you setup your vertex input format with
- * {@link org.apache.giraph.conf.GiraphConfiguration#
- * setVertexInputFormatClass(Class)}.
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-
-@SuppressWarnings("rawtypes")
-public abstract class HCatalogVertexInputFormat<
- I extends WritableComparable,
- V extends Writable,
- E extends Writable,
- M extends Writable>
- extends VertexInputFormat<I, V, E, M> {
- /**
- * HCatalog input format.
- */
- private GiraphHCatInputFormat hCatInputFormat = new GiraphHCatInputFormat();
-
- @Override
- public final List<InputSplit> getSplits(
- final JobContext context, final int numWorkers)
- throws IOException, InterruptedException {
- return hCatInputFormat.getVertexSplits(context);
- }
-
- /**
- * Abstract class that users should subclass
- * based on their specific vertex
- * input. HCatRecord can be parsed to get the
- * required data for implementing
- * getCurrentVertex(). If the vertex spans more
- * than one HCatRecord,
- * nextVertex() should be overwritten to handle that logic as well.
- */
- protected abstract class HCatalogVertexReader implements
- VertexReader<I, V, E, M> {
- /** Giraph configuration */
- private ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
- /** Internal HCatRecordReader. */
- private RecordReader<WritableComparable,
- HCatRecord> hCatRecordReader;
- /** Context passed to initialize. */
- private TaskAttemptContext context;
-
- public ImmutableClassesGiraphConfiguration<I, V, E, M> getConfiguration() {
- return configuration;
- }
-
- /**
- * Initialize with the HCatRecordReader.
- *
- * @param recordReader internal reader
- */
- private void initialize(
- final RecordReader<
- WritableComparable, HCatRecord>
- recordReader) {
- this.hCatRecordReader = recordReader;
- }
-
- @Override
- public final void initialize(
- final InputSplit inputSplit,
- final TaskAttemptContext ctxt)
- throws IOException, InterruptedException {
- hCatRecordReader.initialize(inputSplit, ctxt);
- this.context = ctxt;
- this.configuration = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
- context.getConfiguration());
- }
-
- @Override
- public boolean nextVertex() throws IOException, InterruptedException {
- // Users can override this if desired,
- // and a vertex is bigger than
- // a single row.
- return hCatRecordReader.nextKeyValue();
- }
-
- @Override
- public final void close() throws IOException {
- hCatRecordReader.close();
- }
-
- @Override
- public final float getProgress() throws IOException, InterruptedException {
- return hCatRecordReader.getProgress();
- }
-
- /**
- * Get the record reader.
- * @return Record reader to be used for reading.
- */
- protected final RecordReader<WritableComparable, HCatRecord>
- getRecordReader() {
- return hCatRecordReader;
- }
-
- /**
- * Get the context.
- *
- *
- *
- * @return Context passed to initialize.
- */
- protected final TaskAttemptContext getContext() {
- return context;
- }
- }
-
- /**
- * create vertex reader instance.
- * @return HCatalogVertexReader
- */
- protected abstract HCatalogVertexReader createVertexReader();
-
- @Override
- public final VertexReader<I, V, E, M>
- createVertexReader(final InputSplit split,
- final TaskAttemptContext context)
- throws IOException {
- try {
- HCatalogVertexReader reader = createVertexReader();
- reader.initialize(hCatInputFormat.
- createVertexRecordReader(split, context));
- return reader;
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "createVertexReader: " +
- "Interrupted creating reader.", e);
- }
- }
-
- /**
- * HCatalogVertexReader for tables holding
- * complete vertex info within each
- * row.
- */
- protected abstract class SingleRowHCatalogVertexReader
- extends HCatalogVertexReader {
- /**
- * 1024 const.
- */
- private static final int BYTE_CONST = 1024;
- /**
- * logger
- */
- private final Logger log =
- Logger.getLogger(SingleRowHCatalogVertexReader.class);
- /**
- * record count.
- */
- private int recordCount = 0;
- /**
- * modulus check counter.
- */
- private final int recordModLimit = 1000;
- /**
- * Timed logger to print every 30 seconds
- */
- private final TimedLogger timedLogger = new TimedLogger(30 * 1000,
- log);
-
- /**
- * get vertex id.
- * @param record hcat record
- * @return I id
- */
- protected abstract I getVertexId(HCatRecord record);
-
- /**
- * get vertex value.
- * @param record hcat record
- * @return V value
- */
- protected abstract V getVertexValue(HCatRecord record);
-
- /**
- * get edges.
- * @param record hcat record
- * @return Edges
- */
- protected abstract Iterable<Edge<I, E>> getEdges(HCatRecord record);
-
- @Override
- public final Vertex<I, V, E, M> getCurrentVertex()
- throws IOException, InterruptedException {
- HCatRecord record = getRecordReader().getCurrentValue();
- Vertex<I, V, E, M> vertex = getConfiguration().createVertex();
- vertex.initialize(getVertexId(record), getVertexValue(record),
- getEdges(record));
- ++recordCount;
- if (log.isInfoEnabled() &&
- ((recordCount % recordModLimit) == 0)) {
- // memory usage
- Runtime runtime = Runtime.getRuntime();
- double gb = BYTE_CONST *
- BYTE_CONST *
- BYTE_CONST;
- timedLogger.info(
- "read " + recordCount + " records. Memory: " +
- (runtime.totalMemory() / gb) +
- "GB total = " +
- ((runtime.totalMemory() - runtime.freeMemory()) / gb) +
- "GB used + " + (runtime.freeMemory() / gb) +
- "GB free, " + (runtime.maxMemory() / gb) + "GB max");
- }
- return vertex;
- }
- }
- /**
- * HCatalogVertexReader for tables
- * holding vertex info across multiple rows
- * sorted by vertex id column,
- * so that they appear consecutively to the
- * RecordReader.
- */
- protected abstract class MultiRowHCatalogVertexReader extends
- HCatalogVertexReader {
- /**
- * modulus check counter.
- */
- private static final int RECORD_MOD_LIMIT = 1000;
- /**
- * logger
- */
- private final Logger log =
- Logger.getLogger(MultiRowHCatalogVertexReader.class);
- /**
- * current vertex id.
- */
- private I currentVertexId = null;
- /**
- * current vertex edges.
- */
- private List<Edge<I, E>> currentEdges = Lists.newLinkedList();
- /**
- * record for vertex.
- */
- private List<HCatRecord> recordsForVertex = Lists.newArrayList();
- /**
- * record count.
- */
- private int recordCount = 0;
- /**
- * vertex.
- */
- private Vertex<I, V, E, M> vertex = null;
- /**
- * Timed logger to print every 30 seconds
- */
- private final TimedLogger timedLogger = new TimedLogger(30 * 1000,
- log);
-
-
- /**
- * get vertex id from record.
- *
- * @param record hcat
- * @return I vertex id
- */
- protected abstract I getVertexId(HCatRecord record);
-
- /**
- * get vertex value from record.
- * @param records all vertex values
- * @return V iterable of record values
- */
- protected abstract V getVertexValue(
- Iterable<HCatRecord> records);
-
- /**
- * get target vertex id from record.
- *
- * @param record hcat
- * @return I vertex id of target.
- */
- protected abstract I getTargetVertexId(HCatRecord record);
-
- /**
- * get edge value from record.
- *
- * @param record hcat.
- * @return E edge value.
- */
- protected abstract E getEdgeValue(HCatRecord record);
-
- @Override
- public final Vertex<I, V, E, M>
- getCurrentVertex() throws IOException, InterruptedException {
- return vertex;
- }
-
- @Override
- public boolean nextVertex() throws IOException, InterruptedException {
- while (getRecordReader().nextKeyValue()) {
- HCatRecord record =
- getRecordReader().getCurrentValue();
- if (currentVertexId == null) {
- currentVertexId = getVertexId(record);
- }
- if (currentVertexId.equals(getVertexId(record))) {
- currentEdges.add(new Edge<I, E>(
- getTargetVertexId(record),
- getEdgeValue(record)));
- recordsForVertex.add(record);
- } else {
- createCurrentVertex();
- if (log.isInfoEnabled() && (recordCount % RECORD_MOD_LIMIT) == 0) {
- timedLogger.info("read " + recordCount);
- }
- currentVertexId = getVertexId(record);
- recordsForVertex.add(record);
- return true;
- }
- }
-
- if (currentEdges.isEmpty()) {
- return false;
- } else {
- createCurrentVertex();
- return true;
- }
- }
-
- /**
- * create current vertex.
- */
- private void createCurrentVertex() {
- vertex = getConfiguration().createVertex();
- vertex.initialize(currentVertexId, getVertexValue(recordsForVertex),
- currentEdges);
- currentEdges.clear();
- recordsForVertex.clear();
- ++recordCount;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/57ea5561/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexOutputFormat.java b/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexOutputFormat.java
deleted file mode 100644
index 94c7b85..0000000
--- a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexOutputFormat.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io.hcatalog;
-
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexOutputFormat;
-import org.apache.giraph.graph.VertexWriter;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hcatalog.data.DefaultHCatRecord;
-import org.apache.hcatalog.data.HCatRecord;
-import org.apache.hcatalog.mapreduce.HCatOutputFormat;
-
-import java.io.IOException;
-
-/**
- * Abstract class that users should subclass to store data to Hive or Pig table.
- * You can easily implement a {@link HCatalogVertexWriter} by extending
- * {@link SingleRowHCatalogVertexWriter} or {@link MultiRowHCatalogVertexWriter}
- * depending on how you want to fit your vertices into the output table.
- * <p>
- * The desired database and table name to store to can be specified via
- * {@link HCatOutputFormat#setOutput(org.apache.hadoop.mapreduce.Job,
- * org.apache.hcatalog.mapreduce.OutputJobInfo)}
- * as you setup your vertex output format with
- * {@link org.apache.giraph.conf.GiraphConfiguration}
- * setVertexOutputFormatClass(Class)}. You must create the output table.
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- */
-@SuppressWarnings("rawtypes")
-public abstract class HCatalogVertexOutputFormat<
- I extends WritableComparable,
- V extends Writable,
- E extends Writable>
- extends VertexOutputFormat<I, V, E> {
- /**
- * hcat output format
- */
- protected HCatOutputFormat hCatOutputFormat = new HCatOutputFormat();
-
- @Override
- public final void checkOutputSpecs(JobContext context) throws IOException,
- InterruptedException {
- hCatOutputFormat.checkOutputSpecs(context);
- }
-
- @Override
- public final OutputCommitter getOutputCommitter(TaskAttemptContext context)
- throws IOException, InterruptedException {
- return hCatOutputFormat.getOutputCommitter(context);
- }
-
- /**
- * Abstract class that users should
- * subclass based on their specific vertex
- * output. Users should implement
- * writeVertex to create a HCatRecord that is
- * valid to for writing by HCatalogRecordWriter.
- */
- protected abstract class HCatalogVertexWriter implements
- VertexWriter<I, V, E> {
-
- /** Internal HCatRecordWriter */
- private RecordWriter<WritableComparable<?>, HCatRecord> hCatRecordWriter;
- /** Context passed to initialize */
- private TaskAttemptContext context;
-
- /**
- * Initialize with the HCatRecordWriter
- * @param hCatRecordWriter
- * Internal writer
- */
- private void initialize(
- RecordWriter<WritableComparable<?>,
- HCatRecord> hCatRecordWriter) {
- this.hCatRecordWriter = hCatRecordWriter;
- }
-
- /**
- * Get the record reader.
- * @return Record reader to be used for reading.
- */
- protected RecordWriter<WritableComparable<?>,
- HCatRecord> getRecordWriter() {
- return hCatRecordWriter;
- }
-
- /**
- * Get the context.
- *
- * @return Context passed to initialize.
- */
- protected TaskAttemptContext getContext() {
- return context;
- }
-
- @Override
- public void initialize(TaskAttemptContext context) throws IOException {
- this.context = context;
- }
-
- @Override
- public void close(TaskAttemptContext context) throws IOException,
- InterruptedException {
- hCatRecordWriter.close(context);
- }
-
- }
-
- /**
- * create vertex writer.
- * @return HCatalogVertexWriter
- */
- protected abstract HCatalogVertexWriter createVertexWriter();
-
- @Override
- public final VertexWriter<I, V, E> createVertexWriter(
- TaskAttemptContext context) throws IOException,
- InterruptedException {
- HCatalogVertexWriter writer = createVertexWriter();
- writer.initialize(hCatOutputFormat.getRecordWriter(context));
- return writer;
- }
-
- /**
- * HCatalogVertexWriter to write each vertex in each row.
- */
- protected abstract class SingleRowHCatalogVertexWriter extends
- HCatalogVertexWriter {
- /**
- * get num columns
- * @return intcolumns
- */
- protected abstract int getNumColumns();
-
- /**
- * fill record
- * @param record to fill
- * @param vertex to populate record
- */
- protected abstract void fillRecord(HCatRecord record,
- Vertex<I, V, E, ?> vertex);
-
- /**
- * create record
- * @param vertex to populate record
- * @return HCatRecord newly created
- */
- protected HCatRecord createRecord(Vertex<I, V, E, ?> vertex) {
- HCatRecord record = new DefaultHCatRecord(getNumColumns());
- fillRecord(record, vertex);
- return record;
- }
-
- @Override
- // XXX It is important not to put generic type signature <I,V,E,?> after
- // Vertex. Otherwise, any class that extends this will not compile
- // because of not implementing the VertexWriter#writeVertex. Mystery of
- // Java Generics :(
- @SuppressWarnings("unchecked")
- public final void writeVertex(Vertex vertex) throws IOException,
- InterruptedException {
- getRecordWriter().write(null, createRecord(vertex));
- }
-
- }
-
- /**
- * HCatalogVertexWriter to write each vertex in multiple rows.
- */
- public abstract class MultiRowHCatalogVertexWriter extends
- HCatalogVertexWriter {
- /**
- * create records
- * @param vertex to populate records
- * @return Iterable of records
- */
- protected abstract Iterable<HCatRecord> createRecords(
- Vertex<I, V, E, ?> vertex);
-
- @Override
- // XXX Same thing here. No Generics for Vertex here.
- @SuppressWarnings("unchecked")
- public final void writeVertex(Vertex vertex) throws IOException,
- InterruptedException {
- Iterable<HCatRecord> records = createRecords(vertex);
- for (HCatRecord record : records) {
- getRecordWriter().write(null, record);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/57ea5561/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexValueInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexValueInputFormat.java b/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexValueInputFormat.java
deleted file mode 100644
index d08179d..0000000
--- a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexValueInputFormat.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io.hcatalog;
-
-import org.apache.giraph.graph.VertexValueInputFormat;
-import org.apache.giraph.graph.VertexValueReader;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hcatalog.data.HCatRecord;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * HCatalog {@link VertexValueInputFormat} for reading vertex values from
- * Hive/Pig.
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-public abstract class HCatalogVertexValueInputFormat<I extends
- WritableComparable,
- V extends Writable,
- E extends Writable,
- M extends Writable>
- extends VertexValueInputFormat<I, V, E, M> {
- /**
- * HCatalog input format.
- */
- private GiraphHCatInputFormat hCatInputFormat = new GiraphHCatInputFormat();
-
- @Override
- public List<InputSplit> getSplits(JobContext context, int numWorkers)
- throws IOException, InterruptedException {
- return hCatInputFormat.getVertexSplits(context);
- }
-
- /**
- * {@link VertexValueReader} for {@link HCatalogVertexValueInputFormat}.
- */
- protected abstract class HCatalogVertexValueReader
- extends VertexValueReader<I, V, E, M> {
- /** Internal {@link RecordReader}. */
- private RecordReader<WritableComparable, HCatRecord> hCatRecordReader;
- /** Context passed to initialize. */
- private TaskAttemptContext context;
-
- @Override
- public final void initialize(InputSplit inputSplit,
- TaskAttemptContext context)
- throws IOException, InterruptedException {
- super.initialize(inputSplit, context);
- hCatRecordReader =
- hCatInputFormat.createVertexRecordReader(inputSplit, context);
- hCatRecordReader.initialize(inputSplit, context);
- this.context = context;
- }
-
- @Override
- public boolean nextVertex() throws IOException, InterruptedException {
- return hCatRecordReader.nextKeyValue();
- }
-
- @Override
- public final void close() throws IOException {
- hCatRecordReader.close();
- }
-
- @Override
- public final float getProgress() throws IOException, InterruptedException {
- return hCatRecordReader.getProgress();
- }
-
- /**
- * Get the record reader.
- *
- * @return Record reader to be used for reading.
- */
- protected final RecordReader<WritableComparable, HCatRecord>
- getRecordReader() {
- return hCatRecordReader;
- }
-
- /**
- * Get the context.
- *
- * @return Context passed to initialize.
- */
- protected final TaskAttemptContext getContext() {
- return context;
- }
- }
-
- /**
- * Create {@link VertexValueReader}.
-
- * @return {@link HCatalogVertexValueReader} instance.
- */
- protected abstract HCatalogVertexValueReader createVertexValueReader();
-
- @Override
- public final VertexValueReader<I, V, E, M>
- createVertexValueReader(InputSplit split, TaskAttemptContext context)
- throws IOException {
- try {
- HCatalogVertexValueReader reader = createVertexValueReader();
- reader.initialize(split, context);
- return reader;
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "createVertexValueReader: Interrupted creating reader.", e);
- }
- }
-
- /**
- * {@link HCatalogVertexValueReader} for tables holding a complete vertex
- * value in each row.
- */
- protected abstract class SingleRowHCatalogVertexValueReader
- extends HCatalogVertexValueReader {
- /**
- * Get vertex id from a record.
- *
- * @param record Input record
- * @return I Vertex id
- */
- protected abstract I getVertexId(HCatRecord record);
-
- /**
- * Get vertex value from a record.
- *
- * @param record Input record
- * @return V Vertex value
- */
- protected abstract V getVertexValue(HCatRecord record);
-
- @Override
- public final I getCurrentVertexId() throws IOException,
- InterruptedException {
- return getVertexId(getRecordReader().getCurrentValue());
- }
-
- @Override
- public final V getCurrentVertexValue() throws IOException,
- InterruptedException {
- return getVertexValue(getRecordReader().getCurrentValue());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/57ea5561/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HiveGiraphRunner.java
----------------------------------------------------------------------
diff --git a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HiveGiraphRunner.java b/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HiveGiraphRunner.java
deleted file mode 100644
index 7a7c2f8..0000000
--- a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HiveGiraphRunner.java
+++ /dev/null
@@ -1,490 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io.hcatalog;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.giraph.graph.EdgeInputFormat;
-import org.apache.giraph.graph.GiraphJob;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexInputFormat;
-import org.apache.giraph.graph.VertexOutputFormat;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hcatalog.mapreduce.HCatOutputFormat;
-import org.apache.hcatalog.mapreduce.InputJobInfo;
-import org.apache.hcatalog.mapreduce.OutputJobInfo;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Lists;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Hive Giraph Runner
- */
-public class HiveGiraphRunner implements Tool {
- /**
- * logger
- */
- private static final Logger LOG = Logger.getLogger(HiveGiraphRunner.class);
- /**
- * workers
- */
- protected int workers;
- /**
- * is verbose
- */
- protected boolean isVerbose;
- /**
- * output table partitions
- */
- protected Map<String, String> outputTablePartitionValues;
- /**
- * dbName
- */
- protected String dbName;
- /**
- * vertex input table name
- */
- protected String vertexInputTableName;
- /**
- * vertex input table filter
- */
- protected String vertexInputTableFilterExpr;
- /**
- * edge input table name
- */
- protected String edgeInputTableName;
- /**
- * edge input table filter
- */
- protected String edgeInputTableFilterExpr;
- /**
- * output table name
- */
- protected String outputTableName;
- /** Configuration */
- private Configuration conf;
- /** Skip output? (Useful for testing without writing) */
- private boolean skipOutput = false;
-
- /**
- * vertex class.
- */
- private Class<? extends Vertex> vertexClass;
- /**
- * vertex input format internal.
- */
- private Class<? extends VertexInputFormat> vertexInputFormatClass;
- /**
- * edge input format internal.
- */
- private Class<? extends EdgeInputFormat> edgeInputFormatClass;
- /**
- * vertex output format internal.
- */
- private Class<? extends VertexOutputFormat> vertexOutputFormatClass;
-
- /**
- * Giraph runner class.
- *
- * @param vertexClass Vertex class
- * @param vertexInputFormatClass Vertex input format
- * @param edgeInputFormatClass Edge input format
- * @param vertexOutputFormatClass Output format
- */
- protected HiveGiraphRunner(
- Class<? extends Vertex> vertexClass,
- Class<? extends VertexInputFormat> vertexInputFormatClass,
- Class<? extends EdgeInputFormat> edgeInputFormatClass,
- Class<? extends VertexOutputFormat> vertexOutputFormatClass) {
- this.vertexClass = vertexClass;
- this.vertexInputFormatClass = vertexInputFormatClass;
- this.edgeInputFormatClass = edgeInputFormatClass;
- this.vertexOutputFormatClass = vertexOutputFormatClass;
- this.conf = new HiveConf(getClass());
- }
-
- /**
- * main method
- * @param args system arguments
- * @throws Exception any errors from Hive Giraph Runner
- */
- public static void main(String[] args) throws Exception {
- System.exit(ToolRunner.run(
- new HiveGiraphRunner(null, null, null, null), args));
- }
-
- @Override
- public final int run(String[] args) throws Exception {
- // process args
- try {
- processArguments(args);
- } catch (InterruptedException e) {
- return 0;
- } catch (IllegalArgumentException e) {
- System.err.println(e.getMessage());
- return -1;
- }
-
- // additional configuration for Hive
- adjustConfigurationForHive(getConf());
-
- // setup GiraphJob
- GiraphJob job = new GiraphJob(getConf(), getClass().getName());
- job.getConfiguration().setVertexClass(vertexClass);
-
- // setup input from Hive
- if (vertexInputFormatClass != null) {
- InputJobInfo vertexInputJobInfo = InputJobInfo.create(dbName,
- vertexInputTableName, vertexInputTableFilterExpr);
- GiraphHCatInputFormat.setVertexInput(job.getInternalJob(),
- vertexInputJobInfo);
- job.getConfiguration().setVertexInputFormatClass(vertexInputFormatClass);
- }
- if (edgeInputFormatClass != null) {
- InputJobInfo edgeInputJobInfo = InputJobInfo.create(dbName,
- edgeInputTableName, edgeInputTableFilterExpr);
- GiraphHCatInputFormat.setEdgeInput(job.getInternalJob(),
- edgeInputJobInfo);
- job.getConfiguration().setEdgeInputFormatClass(edgeInputFormatClass);
- }
-
- // setup output to Hive
- HCatOutputFormat.setOutput(job.getInternalJob(), OutputJobInfo.create(
- dbName, outputTableName, outputTablePartitionValues));
- HCatOutputFormat.setSchema(job.getInternalJob(),
- HCatOutputFormat.getTableSchema(job.getInternalJob()));
- if (skipOutput) {
- LOG.warn("run: Warning - Output will be skipped!");
- } else {
- job.getConfiguration().setVertexOutputFormatClass(
- vertexOutputFormatClass);
- }
-
- job.getConfiguration().setWorkerConfiguration(workers, workers, 100.0f);
- initGiraphJob(job);
-
- return job.run(isVerbose) ? 0 : -1;
- }
-
- /**
- * set hive configuration
- * @param conf Configuration argument
- */
- private static void adjustConfigurationForHive(Configuration conf) {
- // when output partitions are used, workers register them to the
- // metastore at cleanup stage, and on HiveConf's initialization, it
- // looks for hive-site.xml from.
- addToStringCollection(conf, "tmpfiles", conf.getClassLoader()
- .getResource("hive-site.xml").toString());
-
- // Also, you need hive.aux.jars as well
- // addToStringCollection(conf, "tmpjars",
- // conf.getStringCollection("hive.aux.jars.path"));
-
- // Or, more effectively, we can provide all the jars client needed to
- // the workers as well
- String[] hadoopJars = System.getenv("HADOOP_CLASSPATH").split(
- File.pathSeparator);
- List<String> hadoopJarURLs = Lists.newArrayList();
- for (String jarPath : hadoopJars) {
- File file = new File(jarPath);
- if (file.exists() && file.isFile()) {
- String jarURL = file.toURI().toString();
- hadoopJarURLs.add(jarURL);
- }
- }
- addToStringCollection(conf, "tmpjars", hadoopJarURLs);
- }
-
- /**
- * process arguments
- * @param args to process
- * @return CommandLine instance
- * @throws ParseException error parsing arguments
- * @throws InterruptedException interrupted
- */
- private CommandLine processArguments(String[] args) throws ParseException,
- InterruptedException {
- Options options = new Options();
- options.addOption("h", "help", false, "Help");
- options.addOption("v", "verbose", false, "Verbose");
- options.addOption("D", "hiveconf", true,
- "property=value for Hive/Hadoop configuration");
- options.addOption("w", "workers", true, "Number of workers");
- if (vertexClass == null) {
- options.addOption(null, "vertexClass", true,
- "Giraph Vertex class to use");
- }
- if (vertexInputFormatClass == null) {
- options.addOption(null, "vertexInputFormatClass", true,
- "Giraph HCatalogVertexInputFormat class to use");
- }
- if (edgeInputFormatClass == null) {
- options.addOption(null, "edgeInputFormatClass", true,
- "Giraph HCatalogEdgeInputFormat class to use");
- }
-
- if (vertexOutputFormatClass == null) {
- options.addOption(null, "vertexOutputFormatClass", true,
- "Giraph HCatalogVertexOutputFormat class to use");
- }
-
- options.addOption("db", "database", true, "Hive database name");
- options.addOption("vi", "vertexInputTable", true,
- "Vertex input table name");
- options.addOption("VI", "vertexInputFilter", true,
- "Vertex input table filter expression (e.g., \"a<2 AND b='two'\"");
- options.addOption("ei", "edgeInputTable", true,
- "Edge input table name");
- options.addOption("EI", "edgeInputFilter", true,
- "Edge input table filter expression (e.g., \"a<2 AND b='two'\"");
- options.addOption("o", "outputTable", true, "Output table name");
- options.addOption("O", "outputPartition", true,
- "Output table partition values (e.g., \"a=1,b=two\")");
- options.addOption("s", "skipOutput", false, "Skip output?");
-
- addMoreOptions(options);
-
- CommandLineParser parser = new GnuParser();
- final CommandLine cmdln = parser.parse(options, args);
- if (args.length == 0 || cmdln.hasOption("help")) {
- new HelpFormatter().printHelp(getClass().getName(), options, true);
- throw new InterruptedException();
- }
-
- // Giraph classes
- if (cmdln.hasOption("vertexClass")) {
- vertexClass = findClass(cmdln.getOptionValue("vertexClass"),
- Vertex.class);
- }
- if (cmdln.hasOption("vertexInputFormatClass")) {
- vertexInputFormatClass = findClass(
- cmdln.getOptionValue("vertexInputFormatClass"),
- HCatalogVertexInputFormat.class);
- }
- if (cmdln.hasOption("edgeInputFormatClass")) {
- edgeInputFormatClass = findClass(
- cmdln.getOptionValue("edgeInputFormatClass"),
- HCatalogEdgeInputFormat.class);
- }
-
- if (cmdln.hasOption("vertexOutputFormatClass")) {
- vertexOutputFormatClass = findClass(
- cmdln.getOptionValue("vertexOutputFormatClass"),
- HCatalogVertexOutputFormat.class);
- }
-
- if (cmdln.hasOption("skipOutput")) {
- skipOutput = true;
- }
-
- if (vertexClass == null) {
- throw new IllegalArgumentException(
- "Need the Giraph Vertex class name (-vertexClass) to use");
- }
- if (vertexInputFormatClass == null && edgeInputFormatClass == null) {
- throw new IllegalArgumentException(
- "Need at least one of Giraph VertexInputFormat " +
- "class name (-vertexInputFormatClass) and " +
- "EdgeInputFormat class name (-edgeInputFormatClass)");
- }
- if (vertexOutputFormatClass == null) {
- throw new IllegalArgumentException(
- "Need the Giraph VertexOutputFormat " +
- "class name (-vertexOutputFormatClass) to use");
- }
- if (!cmdln.hasOption("workers")) {
- throw new IllegalArgumentException(
- "Need to choose the number of workers (-w)");
- }
- if (!cmdln.hasOption("vertexInputTable") &&
- vertexInputFormatClass != null) {
- throw new IllegalArgumentException(
- "Need to set the vertex input table name (-vi)");
- }
- if (!cmdln.hasOption("edgeInputTable") &&
- edgeInputFormatClass != null) {
- throw new IllegalArgumentException(
- "Need to set the edge input table name (-ei)");
- }
- if (!cmdln.hasOption("outputTable")) {
- throw new IllegalArgumentException(
- "Need to set the output table name (-o)");
- }
- dbName = cmdln.getOptionValue("dbName", "default");
- vertexInputTableName = cmdln.getOptionValue("vertexInputTable");
- vertexInputTableFilterExpr = cmdln.getOptionValue("vertexInputFilter");
- edgeInputTableName = cmdln.getOptionValue("edgeInputTable");
- edgeInputTableFilterExpr = cmdln.getOptionValue("edgeInputFilter");
- outputTableName = cmdln.getOptionValue("outputTable");
- outputTablePartitionValues = HiveUtils.parsePartitionValues(cmdln
- .getOptionValue("outputPartition"));
- workers = Integer.parseInt(cmdln.getOptionValue("workers"));
- isVerbose = cmdln.hasOption("verbose");
-
- // pick up -hiveconf arguments
- for (String hiveconf : cmdln.getOptionValues("hiveconf")) {
- String[] keyval = hiveconf.split("=", 2);
- if (keyval.length == 2) {
- String name = keyval[0];
- String value = keyval[1];
- if (name.equals("tmpjars") || name.equals("tmpfiles")) {
- addToStringCollection(
- conf, name, value);
- } else {
- conf.set(name, value);
- }
- }
- }
-
- processMoreArguments(cmdln);
-
- return cmdln;
- }
-
- /**
- * add string to collection
- * @param conf Configuration
- * @param name name to add
- * @param values values for collection
- */
- private static void addToStringCollection(Configuration conf, String name,
- String... values) {
- addToStringCollection(conf, name, Arrays.asList(values));
- }
-
- /**
- * add string to collection
- * @param conf Configuration
- * @param name to add
- * @param values values for collection
- */
- private static void addToStringCollection(
- Configuration conf, String name, Collection
- <? extends String> values) {
- Collection<String> tmpfiles = conf.getStringCollection(name);
- tmpfiles.addAll(values);
- conf.setStrings(name, tmpfiles.toArray(new String[tmpfiles.size()]));
- }
-
- /**
- *
- * @param className to find
- * @param base base class
- * @param <T> class type found
- * @return type found
- */
- private <T> Class<? extends T> findClass(String className, Class<T> base) {
- try {
- Class<?> cls = Class.forName(className);
- if (base.isAssignableFrom(cls)) {
- return cls.asSubclass(base);
- }
- return null;
- } catch (ClassNotFoundException e) {
- throw new IllegalArgumentException(className + ": Invalid class name");
- }
- }
-
- @Override
- public final Configuration getConf() {
- return conf;
- }
-
- @Override
- public final void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- /**
- * Override this method to add more command-line options. You can process
- * them by also overriding {@link #processMoreArguments(CommandLine)}.
- *
- * @param options Options
- */
- protected void addMoreOptions(Options options) {
- }
-
- /**
- * Override this method to process additional command-line arguments. You
- * may want to declare additional options by also overriding
- * {@link #addMoreOptions(Options)}.
- *
- * @param cmd Command
- */
- protected void processMoreArguments(CommandLine cmd) {
- }
-
- /**
- * Override this method to do additional setup with the GiraphJob that will
- * run.
- *
- * @param job
- * GiraphJob that is going to run
- */
- protected void initGiraphJob(GiraphJob job) {
- LOG.info(getClass().getSimpleName() + " with");
- String prefix = "\t";
- LOG.info(prefix + "-vertexClass=" +
- vertexClass.getCanonicalName());
- if (vertexInputFormatClass != null) {
- LOG.info(prefix + "-vertexInputFormatClass=" +
- vertexInputFormatClass.getCanonicalName());
- }
- if (edgeInputFormatClass != null) {
- LOG.info(prefix + "-edgeInputFormatClass=" +
- edgeInputFormatClass.getCanonicalName());
- }
- LOG.info(prefix + "-vertexOutputFormatClass=" +
- vertexOutputFormatClass.getCanonicalName());
- if (vertexInputTableName != null) {
- LOG.info(prefix + "-vertexInputTable=" + vertexInputTableName);
- }
- if (vertexInputTableFilterExpr != null) {
- LOG.info(prefix + "-vertexInputFilter=\"" +
- vertexInputTableFilterExpr + "\"");
- }
- if (edgeInputTableName != null) {
- LOG.info(prefix + "-edgeInputTable=" + edgeInputTableName);
- }
- if (edgeInputTableFilterExpr != null) {
- LOG.info(prefix + "-edgeInputFilter=\"" +
- edgeInputTableFilterExpr + "\"");
- }
- LOG.info(prefix + "-outputTable=" + outputTableName);
- if (outputTablePartitionValues != null) {
- LOG.info(prefix + "-outputPartition=\"" +
- outputTablePartitionValues + "\"");
- }
- LOG.info(prefix + "-workers=" + workers);
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/57ea5561/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HiveUtils.java
----------------------------------------------------------------------
diff --git a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HiveUtils.java b/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HiveUtils.java
deleted file mode 100644
index c1f76f1..0000000
--- a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HiveUtils.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io.hcatalog;
-
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Utilities and helpers for working with Hive tables.
- */
-public class HiveUtils {
- // TODO use Hive util class if this is already provided by it
-
- /**
- * Private constructor for helper class.
- */
- private HiveUtils() {
- // Do nothing.
- }
-
- /**
- * @param outputTablePartitionString table partition string
- * @return Map
- */
- public static Map<String, String> parsePartitionValues(
- String outputTablePartitionString) {
- if (outputTablePartitionString == null) {
- return null;
- }
- Splitter commaSplitter = Splitter.on(',').omitEmptyStrings().trimResults();
- Splitter equalSplitter = Splitter.on('=').omitEmptyStrings().trimResults();
- Map<String, String> partitionValues = Maps.newHashMap();
- for (String keyValStr : commaSplitter.split(outputTablePartitionString)) {
- List<String> keyVal = Lists.newArrayList(equalSplitter.split(keyValStr));
- if (keyVal.size() != 2) {
- throw new IllegalArgumentException(
- "Unrecognized partition value format: " +
- outputTablePartitionString);
- }
- partitionValues.put(keyVal.get(0), keyVal.get(1));
- }
- return partitionValues;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/57ea5561/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/package-info.java b/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/package-info.java
deleted file mode 100644
index b01e254..0000000
--- a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Package of input and output format classes
- * for loading and storing Hive/Pig data using HCatalog.
- */
-package org.apache.giraph.io.hcatalog;
-
http://git-wip-us.apache.org/repos/asf/giraph/blob/57ea5561/giraph-formats/src/main/java/org/apache/hcatalog/mapreduce/HCatUtils.java
----------------------------------------------------------------------
diff --git a/giraph-formats/src/main/java/org/apache/hcatalog/mapreduce/HCatUtils.java b/giraph-formats/src/main/java/org/apache/hcatalog/mapreduce/HCatUtils.java
deleted file mode 100644
index 1f25709..0000000
--- a/giraph-formats/src/main/java/org/apache/hcatalog/mapreduce/HCatUtils.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hcatalog.mapreduce;
-
-import org.apache.giraph.io.hcatalog.GiraphHCatInputFormat;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hcatalog.common.ErrorType;
-import org.apache.hcatalog.common.HCatException;
-import org.apache.hcatalog.common.HCatUtil;
-import org.apache.hcatalog.data.schema.HCatSchema;
-import org.apache.thrift.TException;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Utility methods copied from HCatalog because of visibility restrictions.
- */
-public class HCatUtils {
- /**
- * Don't instantiate.
- */
- private HCatUtils() { }
-
- /**
- * Returns the given InputJobInfo after populating with data queried from the
- * metadata service.
- *
- * @param conf Configuration
- * @param inputJobInfo Input job info
- * @return Populated input job info
- * @throws IOException
- */
- public static InputJobInfo getInputJobInfo(
- Configuration conf, InputJobInfo inputJobInfo)
- throws IOException {
- HiveMetaStoreClient client = null;
- HiveConf hiveConf;
- try {
- if (conf != null) {
- hiveConf = HCatUtil.getHiveConf(conf);
- } else {
- hiveConf = new HiveConf(GiraphHCatInputFormat.class);
- }
- client = HCatUtil.getHiveClient(hiveConf);
- Table table = HCatUtil.getTable(client, inputJobInfo.getDatabaseName(),
- inputJobInfo.getTableName());
-
- List<PartInfo> partInfoList = new ArrayList<PartInfo>();
-
- inputJobInfo.setTableInfo(HCatTableInfo.valueOf(table.getTTable()));
- if (table.getPartitionKeys().size() != 0) {
- // Partitioned table
- List<Partition> parts = client.listPartitionsByFilter(
- inputJobInfo.getDatabaseName(),
- inputJobInfo.getTableName(),
- inputJobInfo.getFilter(),
- (short) -1);
-
- if (parts != null) {
- // Default to 100,000 partitions if hive.metastore.maxpartition is not
- // defined
- int maxPart = hiveConf.getInt("hcat.metastore.maxpartitions", 100000);
- if (parts.size() > maxPart) {
- throw new HCatException(ErrorType.ERROR_EXCEED_MAXPART,
- "total number of partitions is " + parts.size());
- }
-
- // Populate partition info
- for (Partition ptn : parts) {
- HCatSchema schema = HCatUtil.extractSchema(
- new org.apache.hadoop.hive.ql.metadata.Partition(table, ptn));
- PartInfo partInfo = extractPartInfo(schema, ptn.getSd(),
- ptn.getParameters(), conf, inputJobInfo);
- partInfo.setPartitionValues(InternalUtil.createPtnKeyValueMap(table,
- ptn));
- partInfoList.add(partInfo);
- }
- }
- } else {
- // Non partitioned table
- HCatSchema schema = HCatUtil.extractSchema(table);
- PartInfo partInfo = extractPartInfo(schema, table.getTTable().getSd(),
- table.getParameters(), conf, inputJobInfo);
- partInfo.setPartitionValues(new HashMap<String, String>());
- partInfoList.add(partInfo);
- }
- inputJobInfo.setPartitions(partInfoList);
- } catch (MetaException e) {
- throw new IOException("Got MetaException", e);
- } catch (NoSuchObjectException e) {
- throw new IOException("Got NoSuchObjectException", e);
- } catch (TException e) {
- throw new IOException("Got TException", e);
- } catch (HiveException e) {
- throw new IOException("Got HiveException", e);
- } finally {
- HCatUtil.closeHiveClientQuietly(client);
- }
- return inputJobInfo;
- }
-
- /**
- * Extract partition info.
- *
- * @param schema Table schema
- * @param sd Storage descriptor
- * @param parameters Parameters
- * @param conf Configuration
- * @param inputJobInfo Input job info
- * @return Partition info
- * @throws IOException
- */
- private static PartInfo extractPartInfo(
- HCatSchema schema, StorageDescriptor sd, Map<String, String> parameters,
- Configuration conf, InputJobInfo inputJobInfo) throws IOException {
- StorerInfo storerInfo = InternalUtil.extractStorerInfo(sd, parameters);
-
- Properties hcatProperties = new Properties();
- HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(conf,
- storerInfo);
-
- // Copy the properties from storageHandler to jobProperties
- Map<String, String> jobProperties =
- HCatUtil.getInputJobProperties(storageHandler, inputJobInfo);
-
- for (Map.Entry<String, String> param : parameters.entrySet()) {
- hcatProperties.put(param.getKey(), param.getValue());
- }
-
- return new PartInfo(schema, storageHandler, sd.getLocation(),
- hcatProperties, jobProperties, inputJobInfo.getTableInfo());
- }
-
- /**
- * Create a new {@link HCatRecordReader}.
- *
- * @param storageHandler Storage handler
- * @param valuesNotInDataCols Values not in data columns
- * @return Record reader
- */
- public static RecordReader newHCatReader(
- HCatStorageHandler storageHandler,
- Map<String, String> valuesNotInDataCols) {
- return new HCatRecordReader(storageHandler, valuesNotInDataCols);
- }
-
- /**
- * Cast an {@link InputSplit} to {@link HCatSplit}.
- *
- * @param split Input split
- * @return {@link HCatSplit}
- * @throws IOException
- */
- public static HCatSplit castToHCatSplit(InputSplit split)
- throws IOException {
- return InternalUtil.castToHCatSplit(split);
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/57ea5561/giraph-formats/src/main/java/org/apache/hcatalog/mapreduce/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-formats/src/main/java/org/apache/hcatalog/mapreduce/package-info.java b/giraph-formats/src/main/java/org/apache/hcatalog/mapreduce/package-info.java
deleted file mode 100644
index e236aaf..0000000
--- a/giraph-formats/src/main/java/org/apache/hcatalog/mapreduce/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Package for HCatalog helper utilities.
- */
-package org.apache.hcatalog.mapreduce;
http://git-wip-us.apache.org/repos/asf/giraph/blob/57ea5561/giraph-formats/src/test/java/org/apache/giraph/io/accumulo/TestAccumuloVertexFormat.java
----------------------------------------------------------------------
diff --git a/giraph-formats/src/test/java/org/apache/giraph/io/accumulo/TestAccumuloVertexFormat.java b/giraph-formats/src/test/java/org/apache/giraph/io/accumulo/TestAccumuloVertexFormat.java
deleted file mode 100644
index 5885b64..0000000
--- a/giraph-formats/src/test/java/org/apache/giraph/io/accumulo/TestAccumuloVertexFormat.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io.accumulo;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
-import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.util.ByteBufferUtil;
-import org.apache.accumulo.core.util.Pair;
-import org.apache.giraph.BspCase;
-import org.apache.giraph.conf.GiraphConfiguration;
-import org.apache.giraph.io.accumulo.edgemarker.AccumuloEdgeInputFormat;
-import org.apache.giraph.io.accumulo.edgemarker.AccumuloEdgeOutputFormat;
-import org.apache.giraph.graph.EdgeListVertex;
-import org.apache.giraph.graph.GiraphJob;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.HashSet;
-import java.util.Map;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/*
- Test class for Accumulo vertex input/output formats.
- */
-public class TestAccumuloVertexFormat extends BspCase{
-
- private final String TABLE_NAME = "simple_graph";
- private final String INSTANCE_NAME = "instance";
- private final Text FAMILY = new Text("cf");
- private final Text CHILDREN = new Text("children");
- private final String USER = "root";
- private final byte[] PASSWORD = new byte[] {};
- private final Text OUTPUT_FIELD = new Text("parent");
-
-
- private final Logger log = Logger.getLogger(TestAccumuloVertexFormat.class);
-
- /**
- * Create the test case
- */
- public TestAccumuloVertexFormat() {
- super(TestAccumuloVertexFormat.class.getName());
- }
-
- /*
- Write a simple parent-child directed graph to Accumulo.
- Run a job which reads the values
- into subclasses that extend AccumuloVertex I/O formats.
- Check the output after the job.
- */
- @Test
- public void testAccumuloInputOutput() throws Exception {
- if (System.getProperty("prop.mapred.job.tracker") != null) {
- if(log.isInfoEnabled())
- log.info("testAccumuloInputOutput: " +
- "Ignore this test if not local mode.");
- return;
- }
-
- File jarTest = new File(System.getProperty("prop.jarLocation"));
- if(!jarTest.exists()) {
- fail("Could not find Giraph jar at " +
- "location specified by 'prop.jarLocation'. " +
- "Make sure you built the main Giraph artifact?.");
- }
-
- //Write out vertices and edges out to a mock instance.
- MockInstance mockInstance = new MockInstance(INSTANCE_NAME);
- Connector c = mockInstance.getConnector("root", new byte[] {});
- c.tableOperations().create(TABLE_NAME);
- BatchWriter bw = c.createBatchWriter(TABLE_NAME, 10000L, 1000L, 4);
-
- Mutation m1 = new Mutation(new Text("0001"));
- m1.put(FAMILY, CHILDREN, new Value("0002".getBytes()));
- bw.addMutation(m1);
-
- Mutation m2 = new Mutation(new Text("0002"));
- m2.put(FAMILY, CHILDREN, new Value("0003".getBytes()));
- bw.addMutation(m2);
- if(log.isInfoEnabled())
- log.info("Writing mutations to Accumulo table");
- bw.close();
-
- Configuration conf = new Configuration();
- conf.set(AccumuloVertexOutputFormat.OUTPUT_TABLE, TABLE_NAME);
-
- /*
- Very important to initialize the formats before
- sending configuration to the GiraphJob. Otherwise
- the internally constructed Job in GiraphJob will
- not have the proper context initialization.
- */
- AccumuloInputFormat.setInputInfo(conf, USER, "".getBytes(),
- TABLE_NAME, new Authorizations());
- AccumuloInputFormat.setMockInstance(conf, INSTANCE_NAME);
-
- AccumuloOutputFormat.setOutputInfo(conf, USER, PASSWORD, true, null);
- AccumuloOutputFormat.setMockInstance(conf, INSTANCE_NAME);
-
- GiraphJob job = new GiraphJob(conf, getCallingMethodName());
- setupConfiguration(job);
- GiraphConfiguration giraphConf = job.getConfiguration();
- giraphConf.setVertexClass(EdgeNotification.class);
- giraphConf.setVertexInputFormatClass(AccumuloEdgeInputFormat.class);
- giraphConf.setVertexOutputFormatClass(AccumuloEdgeOutputFormat.class);
-
- HashSet<Pair<Text, Text>> columnsToFetch = new HashSet<Pair<Text,Text>>();
- columnsToFetch.add(new Pair<Text, Text>(FAMILY, CHILDREN));
- AccumuloInputFormat.fetchColumns(job.getConfiguration(), columnsToFetch);
-
- if(log.isInfoEnabled())
- log.info("Running edge notification job using Accumulo input");
- assertTrue(job.run(true));
- Scanner scanner = c.createScanner(TABLE_NAME, new Authorizations());
- scanner.setRange(new Range("0002", "0002"));
- scanner.fetchColumn(FAMILY, OUTPUT_FIELD);
- boolean foundColumn = false;
-
- if(log.isInfoEnabled())
- log.info("Verify job output persisted correctly.");
- //make sure we found the qualifier.
- assertTrue(scanner.iterator().hasNext());
-
-
- //now we check to make sure the expected value from the job persisted correctly.
- for(Map.Entry<Key,Value> entry : scanner) {
- Text row = entry.getKey().getRow();
- assertEquals("0002", row.toString());
- Value value = entry.getValue();
- assertEquals("0001", ByteBufferUtil.toString(
- ByteBuffer.wrap(value.get())));
- foundColumn = true;
- }
- }
- /*
- Test compute method that sends each edge a notification of its parents.
- The test set only has a 1-1 parent-to-child ratio for this unit test.
- */
- public static class EdgeNotification
- extends EdgeListVertex<Text, Text, Text, Text> {
- @Override
- public void compute(Iterable<Text> messages) throws IOException {
- for (Text message : messages) {
- getValue().set(message);
- }
- if(getSuperstep() == 0) {
- sendMessageToAllEdges(getId());
- }
- voteToHalt();
- }
- }
-}