You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by di...@apache.org on 2020/05/08 22:26:40 UTC
[giraph] branch trunk updated: GIRAPH-1238
This is an automated email from the ASF dual-hosted git repository.
dionysios pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/giraph.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2cae34a GIRAPH-1238
2cae34a is described below
commit 2cae34a3cfd95329b21e0537ddb30afb5b750209
Author: Dionysios Logothetis <dl...@gmail.com>
AuthorDate: Fri May 8 15:26:20 2020 -0700
GIRAPH-1238
closes #124
---
giraph-accumulo/pom.xml | 193 ---------------------
giraph-accumulo/src/main/assembly/compile.xml | 39 -----
.../io/accumulo/AccumuloVertexInputFormat.java | 169 ------------------
.../io/accumulo/AccumuloVertexOutputFormat.java | 174 -------------------
.../apache/giraph/io/accumulo/package-info.java | 21 ---
.../io/accumulo/TestAccumuloVertexFormat.java | 187 --------------------
.../edgemarker/AccumuloEdgeInputFormat.java | 96 ----------
.../edgemarker/AccumuloEdgeOutputFormat.java | 76 --------
giraph-dist/pom.xml | 4 +
pom.xml | 31 ++--
10 files changed, 14 insertions(+), 976 deletions(-)
diff --git a/giraph-accumulo/pom.xml b/giraph-accumulo/pom.xml
deleted file mode 100644
index ba9a110..0000000
--- a/giraph-accumulo/pom.xml
+++ /dev/null
@@ -1,193 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
-http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.giraph</groupId>
- <artifactId>giraph-parent</artifactId>
- <version>1.3.0-SNAPSHOT</version>
- </parent>
- <artifactId>giraph-accumulo</artifactId>
- <packaging>jar</packaging>
-
- <name>Apache Giraph Accumulo I/O</name>
- <url>http://giraph.apache.org/giraph-accumulo/</url>
- <description>Giraph Accumulo input/output classes</description>
-
- <properties>
- <top.dir>${project.basedir}/..</top.dir>
- </properties>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-assembly-plugin</artifactId>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-checkstyle-plugin</artifactId>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-javadoc-plugin</artifactId>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-site-plugin</artifactId>
- <configuration>
- <siteDirectory>${project.basedir}/src/site</siteDirectory>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <version>2.6</version>
- <configuration>
- <skip>${surefire.skip}</skip>
- <systemProperties>
- <property>
- <name>prop.jarLocation</name>
- <value>${top.dir}/giraph-core/target/giraph-${project.version}-${forHadoop}-jar-with-dependencies.jar</value>
- </property>
- </systemProperties>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>findbugs-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
-
- <profiles>
- <profile>
- <id>hadoop_0.20.203</id>
- <dependencies>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-test</artifactId>
- <version>${hadoop.version}</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
- </profile>
-
- <profile>
- <id>hadoop_1</id>
- <activation>
- <activeByDefault>true</activeByDefault>
- </activation>
- <dependencies>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-test</artifactId>
- <version>${hadoop.version}</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
- </profile>
-
- <profile>
- <id>hadoop_2</id>
- <properties>
- <surefire.skip>true</surefire.skip>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-minicluster</artifactId>
- <version>${hadoop.version}</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
- </profile>
-
- <profile>
- <id>hadoop_non_secure</id>
- <dependencies>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-test</artifactId>
- <version>${hadoop.version}</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
- </profile>
-
- <profile>
- <id>hadoop_facebook</id>
- <dependencies>
- <dependency>
- <groupId>com.facebook.hadoop</groupId>
- <artifactId>hadoop-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.jboss.netty</groupId>
- <artifactId>netty</artifactId>
- <version>${dep.oldnetty.version}</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
- </profile>
- </profiles>
-
- <dependencies>
- <!-- compile dependencies. sorted lexicographically. -->
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.giraph</groupId>
- <artifactId>giraph-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.giraph</groupId>
- <artifactId>giraph-core</artifactId>
- <type>test-jar</type>
- </dependency>
-
- <!-- provided dependencies. sorted lexicographically. -->
- <dependency>
- <groupId>org.apache.accumulo</groupId>
- <artifactId>accumulo-core</artifactId>
- <scope>provided</scope>
- </dependency>
-
- <!-- test dependencies. sorted lexicographically. -->
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-</project>
diff --git a/giraph-accumulo/src/main/assembly/compile.xml b/giraph-accumulo/src/main/assembly/compile.xml
deleted file mode 100644
index 6acf679..0000000
--- a/giraph-accumulo/src/main/assembly/compile.xml
+++ /dev/null
@@ -1,39 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
- <id>jar-with-dependencies</id>
- <formats>
- <format>jar</format>
- </formats>
- <includeBaseDirectory>false</includeBaseDirectory>
-
- <dependencySets>
- <dependencySet>
- <useProjectArtifact>true</useProjectArtifact>
- <outputDirectory>/</outputDirectory>
- <unpackOptions>
- <excludes>
- <exclude>META-INF/LICENSE</exclude>
- </excludes>
- </unpackOptions>
- <unpack>true</unpack>
- <scope>runtime</scope>
- </dependencySet>
- </dependencySets>
-</assembly>
\ No newline at end of file
diff --git a/giraph-accumulo/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java b/giraph-accumulo/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java
deleted file mode 100644
index c286ed4..0000000
--- a/giraph-accumulo/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.io.accumulo;
-
-import java.io.IOException;
-import java.util.List;
-import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.io.VertexInputFormat;
-import org.apache.giraph.io.VertexReader;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-/**
- * Class which wraps the AccumuloInputFormat. It's designed
- * as an extension point to VertexInputFormat subclasses who wish
- * to read from AccumuloTables.
- *
- * Works with
- * {@link AccumuloVertexOutputFormat}
- *
- * @param <I> vertex id type
- * @param <V> vertex value type
- * @param <E> edge type
- */
-public abstract class AccumuloVertexInputFormat<
- I extends WritableComparable,
- V extends Writable,
- E extends Writable>
- extends VertexInputFormat<I, V, E> {
- /**
- * delegate input format for all accumulo operations.
- */
- protected AccumuloInputFormat accumuloInputFormat =
- new AccumuloInputFormat();
-
- /**
- * Abstract class which provides a template for instantiating vertices
- * from Accumulo Key/Value pairs.
- *
- * @param <I> vertex id type
- * @param <V> vertex value type
- * @param <E> edge type
- */
- public abstract static class AccumuloVertexReader<
- I extends WritableComparable,
- V extends Writable, E extends Writable>
- extends VertexReader<I, V, E> {
-
- /** Giraph configuration */
- private ImmutableClassesGiraphConfiguration<I, V, E>
- configuration;
- /**
- * Used by subclasses to read key/value pairs.
- */
- private final RecordReader<Key, Value> reader;
- /** Context passed to initialize */
- private TaskAttemptContext context;
-
- /**
- * Constructor used to pass Record Reader instance
- * @param reader Accumulo record reader
- */
- public AccumuloVertexReader(RecordReader<Key, Value> reader) {
- this.reader = reader;
- }
-
- public ImmutableClassesGiraphConfiguration<I, V, E>
- getConfiguration() {
- return configuration;
- }
-
- /**
- * initialize the reader.
- *
- * @param inputSplit Input split to be used for reading vertices.
- * @param context Context from the task.
- * @throws IOException
- * @throws InterruptedException
- */
- public void initialize(InputSplit inputSplit,
- TaskAttemptContext context)
- throws IOException, InterruptedException {
- reader.initialize(inputSplit, context);
- this.context = context;
- this.configuration =
- new ImmutableClassesGiraphConfiguration<I, V, E>(
- context.getConfiguration());
- }
-
- /**
- * close
- *
- * @throws IOException
- */
- public void close() throws IOException {
- reader.close();
- }
-
- /**
- * getProgress
- *
- * @return progress
- * @throws IOException
- * @throws InterruptedException
- */
- public float getProgress() throws IOException, InterruptedException {
- return reader.getProgress();
- }
-
- /**
- * Get the result record reader
- *
- * @return Record reader to be used for reading.
- */
- protected RecordReader<Key, Value> getRecordReader() {
- return reader;
- }
-
- /**
- * getContext
- *
- * @return Context passed to initialize.
- */
- protected TaskAttemptContext getContext() {
- return context;
- }
-
- }
-
- @Override
- public List<InputSplit> getSplits(
- JobContext context, int minSplitCountHint)
- throws IOException, InterruptedException {
- List<InputSplit> splits = null;
- try {
- splits = accumuloInputFormat.getSplits(context);
- } catch (IOException e) {
- if (e.getMessage().contains("Input info has not been set")) {
- throw new IOException(e.getMessage() +
- " Make sure you initialized" +
- " AccumuloInputFormat static setters " +
- "before passing the config to GiraphJob.");
- }
- }
- return splits;
- }
-}
diff --git a/giraph-accumulo/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexOutputFormat.java b/giraph-accumulo/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexOutputFormat.java
deleted file mode 100644
index 1927ed7..0000000
--- a/giraph-accumulo/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexOutputFormat.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.io.accumulo;
-
-import java.io.IOException;
-import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.giraph.io.VertexOutputFormat;
-import org.apache.giraph.io.VertexWriter;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-/**
- *
- * Class which wraps the AccumuloOutputFormat. It's designed
- * as an extension point to VertexOutputFormat subclasses who wish
- * to write vertices back to an Accumulo table.
- *
- * Works with
- * {@link AccumuloVertexInputFormat}
- *
- *
- * @param <I> vertex id type
- * @param <V> vertex value type
- * @param <E> edge type
- */
-public abstract class AccumuloVertexOutputFormat<
- I extends WritableComparable,
- V extends Writable,
- E extends Writable>
- extends VertexOutputFormat<I, V, E> {
-
-
- /**
- * Output table parameter
- */
- public static final String OUTPUT_TABLE = "OUTPUT_TABLE";
-
- /**
- * Accumulo delegate for table output
- */
- protected AccumuloOutputFormat accumuloOutputFormat =
- new AccumuloOutputFormat();
-
- /**
- *
- * Main abstraction point for vertex writers to persist back
- * to Accumulo tables.
- *
- * @param <I> vertex id type
- * @param <V> vertex value type
- * @param <E> edge type
- */
- public abstract static class AccumuloVertexWriter<
- I extends WritableComparable,
- V extends Writable,
- E extends Writable>
- extends VertexWriter<I, V, E> {
-
- /**
- * task attempt context.
- */
- private TaskAttemptContext context;
-
- /**
- * Accumulo record writer
- */
- private RecordWriter<Text, Mutation> recordWriter;
-
- /**
- * Constructor for use with subclasses
- *
- * @param recordWriter accumulo record writer
- */
- public AccumuloVertexWriter(RecordWriter<Text, Mutation> recordWriter) {
- this.recordWriter = recordWriter;
- }
-
- /**
- * initialize
- *
- * @param context Context used to write the vertices.
- * @throws IOException
- */
- public void initialize(TaskAttemptContext context) throws IOException {
- this.context = context;
- }
-
- /**
- * close
- *
- * @param context the context of the task
- * @throws IOException
- * @throws InterruptedException
- */
- public void close(TaskAttemptContext context)
- throws IOException, InterruptedException {
- recordWriter.close(context);
- }
-
- /**
- * Get the table record writer;
- *
- * @return Record writer to be used for writing.
- */
- public RecordWriter<Text, Mutation> getRecordWriter() {
- return recordWriter;
- }
-
- /**
- * Get the context.
- *
- * @return Context passed to initialize.
- */
- public TaskAttemptContext getContext() {
- return context;
- }
-
- }
- /**
- *
- * checkOutputSpecs
- *
- * @param context information about the job
- * @throws IOException
- * @throws InterruptedException
- */
- @Override
- public void checkOutputSpecs(JobContext context)
- throws IOException, InterruptedException {
- try {
- accumuloOutputFormat.checkOutputSpecs(context);
- } catch (IOException e) {
- if (e.getMessage().contains("Output info has not been set")) {
- throw new IOException(e.getMessage() + " Make sure you initialized" +
- " AccumuloOutputFormat static setters " +
- "before passing the config to GiraphJob.");
- }
- }
- }
-
- /**
- * getOutputCommitter
- *
- * @param context the task context
- * @return OutputCommitter
- * @throws IOException
- * @throws InterruptedException
- */
- @Override
- public OutputCommitter getOutputCommitter(TaskAttemptContext context)
- throws IOException, InterruptedException {
- return accumuloOutputFormat.getOutputCommitter(context);
- }
-}
diff --git a/giraph-accumulo/src/main/java/org/apache/giraph/io/accumulo/package-info.java b/giraph-accumulo/src/main/java/org/apache/giraph/io/accumulo/package-info.java
deleted file mode 100644
index e75d292..0000000
--- a/giraph-accumulo/src/main/java/org/apache/giraph/io/accumulo/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Accumulo Input/Output for Giraph
- */
-package org.apache.giraph.io.accumulo;
diff --git a/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/TestAccumuloVertexFormat.java b/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/TestAccumuloVertexFormat.java
deleted file mode 100644
index 0ee9666..0000000
--- a/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/TestAccumuloVertexFormat.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io.accumulo;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
-import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.util.ByteBufferUtil;
-import org.apache.accumulo.core.util.Pair;
-import org.apache.giraph.BspCase;
-import org.apache.giraph.graph.BasicComputation;
-import org.apache.giraph.conf.GiraphConfiguration;
-import org.apache.giraph.io.accumulo.edgemarker.AccumuloEdgeInputFormat;
-import org.apache.giraph.io.accumulo.edgemarker.AccumuloEdgeOutputFormat;
-import org.apache.giraph.job.GiraphJob;
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.HashSet;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/*
- Test class for Accumulo vertex input/output formats.
- */
-public class TestAccumuloVertexFormat extends BspCase{
-
- private final String TABLE_NAME = "simple_graph";
- private final String INSTANCE_NAME = "instance";
- private final Text FAMILY = new Text("cf");
- private final Text CHILDREN = new Text("children");
- private final String USER = "root";
- private final byte[] PASSWORD = new byte[] {};
- private final Text OUTPUT_FIELD = new Text("parent");
-
-
- private final Logger log = Logger.getLogger(TestAccumuloVertexFormat.class);
-
- /**
- * Create the test case
- */
- public TestAccumuloVertexFormat() {
- super(TestAccumuloVertexFormat.class.getName());
- }
-
- /*
- Write a simple parent-child directed graph to Accumulo.
- Run a job which reads the values
- into subclasses that extend AccumuloVertex I/O formats.
- Check the output after the job.
- */
- @Test
- public void testAccumuloInputOutput() throws Exception {
- if (System.getProperty("prop.mapred.job.tracker") != null) {
- if(log.isInfoEnabled())
- log.info("testAccumuloInputOutput: " +
- "Ignore this test if not local mode.");
- return;
- }
-
- File jarTest = new File(System.getProperty("prop.jarLocation"));
- if(!jarTest.exists()) {
- fail("Could not find Giraph jar at " +
- "location specified by 'prop.jarLocation'. " +
- "Make sure you built the main Giraph artifact?.");
- }
-
- //Write out vertices and edges out to a mock instance.
- MockInstance mockInstance = new MockInstance(INSTANCE_NAME);
- Connector c = mockInstance.getConnector("root", new byte[] {});
- c.tableOperations().create(TABLE_NAME);
- BatchWriter bw = c.createBatchWriter(TABLE_NAME, 10000L, 1000L, 4);
-
- Mutation m1 = new Mutation(new Text("0001"));
- m1.put(FAMILY, CHILDREN, new Value("0002".getBytes()));
- bw.addMutation(m1);
-
- Mutation m2 = new Mutation(new Text("0002"));
- m2.put(FAMILY, CHILDREN, new Value("0003".getBytes()));
- bw.addMutation(m2);
- if(log.isInfoEnabled())
- log.info("Writing mutations to Accumulo table");
- bw.close();
-
- Configuration conf = new Configuration();
- conf.set(AccumuloVertexOutputFormat.OUTPUT_TABLE, TABLE_NAME);
-
- /*
- Very important to initialize the formats before
- sending configuration to the GiraphJob. Otherwise
- the internally constructed Job in GiraphJob will
- not have the proper context initialization.
- */
- AccumuloInputFormat.setInputInfo(conf, USER, "".getBytes(),
- TABLE_NAME, new Authorizations());
- AccumuloInputFormat.setMockInstance(conf, INSTANCE_NAME);
-
- AccumuloOutputFormat.setOutputInfo(conf, USER, PASSWORD, true, null);
- AccumuloOutputFormat.setMockInstance(conf, INSTANCE_NAME);
-
- GiraphJob job = new GiraphJob(conf, getCallingMethodName());
- setupConfiguration(job);
- GiraphConfiguration giraphConf = job.getConfiguration();
- giraphConf.setComputationClass(EdgeNotification.class);
- giraphConf.setVertexInputFormatClass(AccumuloEdgeInputFormat.class);
- giraphConf.setVertexOutputFormatClass(AccumuloEdgeOutputFormat.class);
-
- HashSet<Pair<Text, Text>> columnsToFetch = new HashSet<Pair<Text,Text>>();
- columnsToFetch.add(new Pair<Text, Text>(FAMILY, CHILDREN));
- AccumuloInputFormat.fetchColumns(job.getConfiguration(), columnsToFetch);
-
- if(log.isInfoEnabled())
- log.info("Running edge notification job using Accumulo input");
- assertTrue(job.run(true));
- Scanner scanner = c.createScanner(TABLE_NAME, new Authorizations());
- scanner.setRange(new Range("0002", "0002"));
- scanner.fetchColumn(FAMILY, OUTPUT_FIELD);
- boolean foundColumn = false;
-
- if(log.isInfoEnabled())
- log.info("Verify job output persisted correctly.");
- //make sure we found the qualifier.
- assertTrue(scanner.iterator().hasNext());
-
-
- //now we check to make sure the expected value from the job persisted correctly.
- for(Map.Entry<Key,Value> entry : scanner) {
- Text row = entry.getKey().getRow();
- assertEquals("0002", row.toString());
- Value value = entry.getValue();
- assertEquals("0001", ByteBufferUtil.toString(
- ByteBuffer.wrap(value.get())));
- foundColumn = true;
- }
- }
- /*
- Test compute method that sends each edge a notification of its parents.
- The test set only has a 1-1 parent-to-child ratio for this unit test.
- */
- public static class EdgeNotification
- extends BasicComputation<Text, Text, Text, Text> {
- @Override
- public void compute(Vertex<Text, Text, Text> vertex,
- Iterable<Text> messages) throws IOException {
- for (Text message : messages) {
- vertex.getValue().set(message);
- }
- if(getSuperstep() == 0) {
- sendMessageToAllEdges(vertex, vertex.getId());
- }
- vertex.voteToHalt();
- }
- }
-}
diff --git a/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java b/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java
deleted file mode 100644
index ff79d7a..0000000
--- a/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.io.accumulo.edgemarker;
-
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.edge.EdgeFactory;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.io.VertexReader;
-import org.apache.giraph.io.accumulo.AccumuloVertexInputFormat;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import com.google.common.collect.Lists;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.regex.Pattern;
-
-/*
- Example subclass which reads in Key/Value pairs to construct vertex objects.
- */
-public class AccumuloEdgeInputFormat
- extends AccumuloVertexInputFormat<Text, Text, Text> {
- @Override public void checkInputSpecs(Configuration conf) { }
-
- private static final Text uselessEdgeValue = new Text();
- public VertexReader<Text, Text, Text>
- createVertexReader(InputSplit split, TaskAttemptContext context)
- throws IOException {
- try {
-
- return new AccumuloEdgeVertexReader(
- accumuloInputFormat.createRecordReader(split, context)) {
- };
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
-
- }
- /*
- Reader takes Key/Value pairs from the underlying input format.
- */
- public static class AccumuloEdgeVertexReader
- extends AccumuloVertexReader<Text, Text, Text> {
-
- public static final Pattern commaPattern = Pattern.compile("[,]");
-
- public AccumuloEdgeVertexReader(RecordReader<Key, Value> recordReader) {
- super(recordReader);
- }
-
-
- public boolean nextVertex() throws IOException, InterruptedException {
- return getRecordReader().nextKeyValue();
- }
-
- /*
- Each Key/Value contains the information needed to construct the vertices.
- */
- public Vertex<Text, Text, Text> getCurrentVertex()
- throws IOException, InterruptedException {
- Key key = getRecordReader().getCurrentKey();
- Value value = getRecordReader().getCurrentValue();
- Vertex<Text, Text, Text> vertex =
- getConfiguration().createVertex();
- Text vertexId = key.getRow();
- List<Edge<Text, Text>> edges = Lists.newLinkedList();
- String edge = new String(value.get());
- Text edgeId = new Text(edge);
- edges.add(EdgeFactory.create(edgeId, uselessEdgeValue));
- vertex.initialize(vertexId, new Text(), edges);
-
- return vertex;
- }
- }
-}
diff --git a/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeOutputFormat.java b/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeOutputFormat.java
deleted file mode 100644
index c2ebbe2..0000000
--- a/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeOutputFormat.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.io.accumulo.edgemarker;
-
-import java.io.IOException;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.io.VertexWriter;
-import org.apache.giraph.io.accumulo.AccumuloVertexOutputFormat;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-/*
- Example subclass for writing vertices back to Accumulo.
- */
-public class AccumuloEdgeOutputFormat
- extends AccumuloVertexOutputFormat<Text, Text, Text> {
-
- public VertexWriter<Text, Text, Text>
- createVertexWriter(TaskAttemptContext context)
- throws IOException, InterruptedException {
- RecordWriter<Text, Mutation> writer =
- accumuloOutputFormat.getRecordWriter(context);
- String tableName = getConf().get(OUTPUT_TABLE);
- if(tableName == null)
- throw new IOException("Forgot to set table name " +
- "using AccumuloVertexOutputFormat.OUTPUT_TABLE");
- return new AccumuloEdgeVertexWriter(writer, tableName);
- }
-
- /*
- Wraps RecordWriter for writing Mutations back to the configured Accumulo Table.
- */
- public static class AccumuloEdgeVertexWriter
- extends AccumuloVertexWriter<Text, Text, Text> {
-
- private final Text CF = new Text("cf");
- private final Text PARENT = new Text("parent");
- private Text tableName;
-
- public AccumuloEdgeVertexWriter(
- RecordWriter<Text, Mutation> writer, String tableName) {
- super(writer);
- this.tableName = new Text(tableName);
- }
- /*
- Write back a mutation that adds a qualifier for 'parent' containing the vertex value
- as the cell value. Assume the vertex ID corresponds to a key.
- */
- public void writeVertex(Vertex<Text, Text, Text> vertex)
- throws IOException, InterruptedException {
- RecordWriter<Text, Mutation> writer = getRecordWriter();
- Mutation mt = new Mutation(vertex.getId());
- mt.put(CF, PARENT, new Value(
- vertex.getValue().toString().getBytes()));
- writer.write(tableName, mt);
- }
- }
-}
diff --git a/giraph-dist/pom.xml b/giraph-dist/pom.xml
index 10b7a1b..91ff29c 100644
--- a/giraph-dist/pom.xml
+++ b/giraph-dist/pom.xml
@@ -95,7 +95,11 @@
</dependency>
<dependency>
<groupId>org.apache.giraph</groupId>
+<<<<<<< HEAD
<artifactId>giraph-accumulo</artifactId>
+=======
+ <artifactId>giraph-hcatalog</artifactId>
+>>>>>>> 8bf0c60fad64676bc6181dac7320df8d3bbedcbf
</dependency>
<dependency>
<groupId>org.apache.giraph</groupId>
diff --git a/pom.xml b/pom.xml
index 8a92238..4b9f89a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -312,7 +312,6 @@ under the License.
<checkstyle.config.path>${top.dir}/checkstyle.xml</checkstyle.config.path>
<dep.avro.version>1.7.6</dep.avro.version>
- <dep.accumulo.version>1.4.0</dep.accumulo.version>
<dep.asm.version>3.2</dep.asm.version>
<dep.airline.version>0.5</dep.airline.version>
<dep.base64.version>2.3.8</dep.base64.version>
@@ -987,8 +986,12 @@ under the License.
<profile>
<id>hadoop_1</id>
<modules>
+<<<<<<< HEAD
<module>giraph-accumulo</module>
<<<<<<< HEAD
+=======
+ <module>giraph-hcatalog</module>
+>>>>>>> 8bf0c60fad64676bc6181dac7320df8d3bbedcbf
<module>giraph-gora</module>
=======
<module>giraph-hcatalog</module>
@@ -1141,8 +1144,11 @@ under the License.
<profile>
<id>hadoop_2</id>
<modules>
+<<<<<<< HEAD
<module>giraph-accumulo</module>
<<<<<<< HEAD
+=======
+>>>>>>> 8bf0c60fad64676bc6181dac7320df8d3bbedcbf
<module>giraph-hbase</module>
<<<<<<< HEAD
=======
@@ -1525,12 +1531,15 @@ under the License.
</dependency>
<dependency>
<groupId>org.apache.giraph</groupId>
+<<<<<<< HEAD
<artifactId>giraph-accumulo</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.giraph</groupId>
<<<<<<< HEAD
+=======
+>>>>>>> 8bf0c60fad64676bc6181dac7320df8d3bbedcbf
<artifactId>giraph-rexster-io</artifactId>
<version>${project.version}</version>
</dependency>
@@ -1798,26 +1807,6 @@ under the License.
<version>${dep.commons-net.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.accumulo</groupId>
- <artifactId>accumulo-core</artifactId>
- <version>${dep.accumulo.version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>avro</artifactId>
<version>${dep.avro.version}</version>