You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2018/05/16 15:53:20 UTC
[2/3] incubator-rya git commit: RYA-487 Closes #296,
Implement Kafka Connect Sink implementations for Accumulo and Mongo
DB backed Rya.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/api/src/test/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/api/src/test/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTaskTest.java b/extras/kafka.connect/api/src/test/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTaskTest.java
new file mode 100644
index 0000000..e90042d
--- /dev/null
+++ b/extras/kafka.connect/api/src/test/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTaskTest.java
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.eclipse.rdf4j.common.iteration.CloseableIteration;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.model.ValueFactory;
+import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailConnection;
+import org.eclipse.rdf4j.sail.SailException;
+import org.eclipse.rdf4j.sail.memory.MemoryStore;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Unit tests the methods of {@link RyaSinkTask}.
+ */
+public class RyaSinkTaskTest {
+
+ /**
+ * A {@link RyaSinkTask} used to test against an in memory Sail instance.
+ */
+ private static final class InMemoryRyaSinkTask extends RyaSinkTask {
+
+ private Sail sail = null;
+
+ @Override
+ protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws IllegalStateException {
+ // Do nothing. Always assume the Rya Instance exists.
+ }
+
+ @Override
+ protected Sail makeSail(final Map<String, String> taskConfig) {
+ if(sail == null) {
+ sail = new MemoryStore();
+ sail.initialize();
+ }
+ return sail;
+ }
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void start_ryaInstanceDoesNotExist() {
+ // Create the task that will be tested.
+ final RyaSinkTask task = new RyaSinkTask() {
+ @Override
+ protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws IllegalStateException {
+ throw new IllegalStateException("It doesn't exist.");
+ }
+
+ @Override
+ protected Sail makeSail(final Map<String, String> taskConfig) { return null; }
+ };
+
+ // Since the rya instance does not exist, this will throw an exception.
+ task.start(new HashMap<>());
+ }
+
+ @Test
+ public void singleRecord() {
+ // Create the Statements that will be put by the task.
+ final ValueFactory vf = SimpleValueFactory.getInstance();
+ final Set<Statement> statements = Sets.newHashSet(
+ vf.createStatement(
+ vf.createIRI("urn:Alice"),
+ vf.createIRI("urn:WorksAt"),
+ vf.createIRI("urn:Taco Shop"),
+ vf.createIRI("urn:graph1")),
+ vf.createStatement(
+ vf.createIRI("urn:Bob"),
+ vf.createIRI("urn:TalksTo"),
+ vf.createIRI("urn:Charlie"),
+ vf.createIRI("urn:graph2")),
+ vf.createStatement(
+ vf.createIRI("urn:Eve"),
+ vf.createIRI("urn:ListensTo"),
+ vf.createIRI("urn:Alice"),
+ vf.createIRI("urn:graph1")));
+
+ // Create the task that will be tested.
+ final InMemoryRyaSinkTask task = new InMemoryRyaSinkTask();
+
+ // Setup the properties that will be used to configure the task. We don't actually need to set anything
+ // here since we're always returning true for ryaInstanceExists(...) and use an in memory RDF store.
+ final Map<String, String> props = new HashMap<>();
+
+ try {
+ // Start the task.
+ task.start(props);
+
+ // Put the statements as a SinkRecord.
+ task.put( Collections.singleton(new SinkRecord("topic", 1, null, "key", null, statements, 0)) );
+
+ // Flush the statements.
+ task.flush(new HashMap<>());
+
+ // Fetch the stored Statements to show they match the original set.
+ final Set<Statement> fetched = new HashSet<>();
+
+ final Sail sail = task.makeSail(props);
+ try(SailConnection conn = sail.getConnection();
+ CloseableIteration<? extends Statement, SailException> it = conn.getStatements(null, null, null, false)) {
+ while(it.hasNext()) {
+ fetched.add( it.next() );
+ }
+ }
+
+ assertEquals(statements, fetched);
+
+ } finally {
+ // Stop the task.
+ task.stop();
+ }
+ }
+
+ @Test
+ public void multipleRecords() {
+ // Create the Statements that will be put by the task.
+ final ValueFactory vf = SimpleValueFactory.getInstance();
+ final Set<Statement> batch1 = Sets.newHashSet(
+ vf.createStatement(
+ vf.createIRI("urn:Alice"),
+ vf.createIRI("urn:WorksAt"),
+ vf.createIRI("urn:Taco Shop"),
+ vf.createIRI("urn:graph1")),
+ vf.createStatement(
+ vf.createIRI("urn:Bob"),
+ vf.createIRI("urn:TalksTo"),
+ vf.createIRI("urn:Charlie"),
+ vf.createIRI("urn:graph2")));
+
+ final Set<Statement> batch2 = Sets.newHashSet(
+ vf.createStatement(
+ vf.createIRI("urn:Eve"),
+ vf.createIRI("urn:ListensTo"),
+ vf.createIRI("urn:Alice"),
+ vf.createIRI("urn:graph1")));
+
+ // Create the task that will be tested.
+ final InMemoryRyaSinkTask task = new InMemoryRyaSinkTask();
+
+ // Setup the properties that will be used to configure the task. We don't actually need to set anything
+ // here since we're always returning true for ryaInstanceExists(...) and use an in memory RDF store.
+ final Map<String, String> props = new HashMap<>();
+
+ try {
+ // Start the task.
+ task.start(props);
+
+ // Put the statements as SinkRecords.
+ final Collection<SinkRecord> records = Sets.newHashSet(
+ new SinkRecord("topic", 1, null, "key", null, batch1, 0),
+ new SinkRecord("topic", 1, null, "key", null, batch2, 1));
+ task.put( records );
+
+ // Flush the statements.
+ task.flush(new HashMap<>());
+
+ // Fetch the stored Statements to show they match the original set.
+ final Set<Statement> fetched = new HashSet<>();
+
+ final Sail sail = task.makeSail(props);
+ try(SailConnection conn = sail.getConnection();
+ CloseableIteration<? extends Statement, SailException> it = conn.getStatements(null, null, null, false)) {
+ while(it.hasNext()) {
+ fetched.add( it.next() );
+ }
+ }
+
+ assertEquals(Sets.union(batch1, batch2), fetched);
+
+ } finally {
+ // Stop the task.
+ task.stop();
+ }
+ }
+
+ @Test
+ public void flushBetweenPuts() {
+ // Create the Statements that will be put by the task.
+ final ValueFactory vf = SimpleValueFactory.getInstance();
+ final Set<Statement> batch1 = Sets.newHashSet(
+ vf.createStatement(
+ vf.createIRI("urn:Alice"),
+ vf.createIRI("urn:WorksAt"),
+ vf.createIRI("urn:Taco Shop"),
+ vf.createIRI("urn:graph1")),
+ vf.createStatement(
+ vf.createIRI("urn:Bob"),
+ vf.createIRI("urn:TalksTo"),
+ vf.createIRI("urn:Charlie"),
+ vf.createIRI("urn:graph2")));
+
+ final Set<Statement> batch2 = Sets.newHashSet(
+ vf.createStatement(
+ vf.createIRI("urn:Eve"),
+ vf.createIRI("urn:ListensTo"),
+ vf.createIRI("urn:Alice"),
+ vf.createIRI("urn:graph1")));
+
+ // Create the task that will be tested.
+ final InMemoryRyaSinkTask task = new InMemoryRyaSinkTask();
+
+ // Setup the properties that will be used to configure the task. We don't actually need to set anything
+ // here since we're always returning true for ryaInstanceExists(...) and use an in memory RDF store.
+ final Map<String, String> props = new HashMap<>();
+
+ try {
+ // Start the task.
+ task.start(props);
+
+ // Put the statements with flushes between them.
+ task.put( Collections.singleton(new SinkRecord("topic", 1, null, "key", null, batch1, 0)) );
+ task.flush(new HashMap<>());
+ task.put( Collections.singleton(new SinkRecord("topic", 1, null, "key", null, batch2, 1)) );
+ task.flush(new HashMap<>());
+
+ // Fetch the stored Statements to show they match the original set.
+ final Set<Statement> fetched = new HashSet<>();
+
+ final Sail sail = task.makeSail(props);
+ try(SailConnection conn = sail.getConnection();
+ CloseableIteration<? extends Statement, SailException> it = conn.getStatements(null, null, null, false)) {
+ while(it.hasNext()) {
+ fetched.add( it.next() );
+ }
+ }
+
+ assertEquals(Sets.union(batch1, batch2), fetched);
+
+ } finally {
+ // Stop the task.
+ task.stop();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/api/src/test/resources/simplelogger.properties
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/api/src/test/resources/simplelogger.properties b/extras/kafka.connect/api/src/test/resources/simplelogger.properties
new file mode 100644
index 0000000..1b21312
--- /dev/null
+++ b/extras/kafka.connect/api/src/test/resources/simplelogger.properties
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+org.slf4j.simpleLogger.defaultLogLevel=debug
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/client/README.md
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/client/README.md b/extras/kafka.connect/client/README.md
new file mode 100644
index 0000000..c7b8963
--- /dev/null
+++ b/extras/kafka.connect/client/README.md
@@ -0,0 +1,21 @@
+<!-- Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. -->
+
+This project creates a shaded executable jar that may be used to load and
+read statements from a Kafka Topic in the format that the Rya Kafka Connect
+Sinks expect. This tool is only meant to be used for testing/debugging Kafka
+Connect integration.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/client/pom.xml
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/client/pom.xml b/extras/kafka.connect/client/pom.xml
new file mode 100644
index 0000000..1ffc8d6
--- /dev/null
+++ b/extras/kafka.connect/client/pom.xml
@@ -0,0 +1,113 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.kafka.connect.parent</artifactId>
+ <version>4.0.0-incubating-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>rya.kafka.connect.client</artifactId>
+
+ <name>Apache Rya Kafka Connect - Client</name>
+ <description>Contains a client that may be used to load Statements into
+ a Kafka topic to be read by Kafka Connect.</description>
+
+ <dependencies>
+ <!-- 1st party dependencies. -->
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.sail</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.kafka.connect.api</artifactId>
+ </dependency>
+
+ <!-- 3rd party dependencies. -->
+ <dependency>
+ <groupId>org.eclipse.rdf4j</groupId>
+ <artifactId>rdf4j-model</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.beust</groupId>
+ <artifactId>jcommander</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.github.stephenc.findbugs</groupId>
+ <artifactId>findbugs-annotations</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
+
+ <!-- Testing dependencies. -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!-- Create an executable jar for the client application. -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.apache.rya.kafka.connect.client.CLIDriver</mainClass>
+ </transformer>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/CLIDriver.java
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/CLIDriver.java b/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/CLIDriver.java
new file mode 100644
index 0000000..7ebf083
--- /dev/null
+++ b/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/CLIDriver.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.client;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand.ArgumentsException;
+import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand.ExecutionException;
+import org.apache.rya.kafka.connect.client.command.ReadStatementsCommand;
+import org.apache.rya.kafka.connect.client.command.WriteStatementsCommand;
+import org.eclipse.rdf4j.model.Statement;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A CLI tool used to read/write {@link Statement}s to/from a Kafka topic using the format
+ * the Rya Kafka Connect Sinks expect.
+ */
+@DefaultAnnotation(NonNull.class)
+public class CLIDriver {
+
+ /**
+ * Maps from command strings to the object that performs the command.
+ */
+ private static final ImmutableMap<String, RyaKafkaClientCommand> COMMANDS;
+ static {
+ final Set<Class<? extends RyaKafkaClientCommand>> commandClasses = new HashSet<>();
+ commandClasses.add(ReadStatementsCommand.class);
+ commandClasses.add(WriteStatementsCommand.class);
+ final ImmutableMap.Builder<String, RyaKafkaClientCommand> builder = ImmutableMap.builder();
+ for(final Class<? extends RyaKafkaClientCommand> commandClass : commandClasses) {
+ try {
+ final RyaKafkaClientCommand command = commandClass.newInstance();
+ builder.put(command.getCommand(), command);
+ } catch (InstantiationException | IllegalAccessException e) {
+ System.err.println("Could not run the application because a RyaKafkaClientCommand is missing its empty constructor.");
+ e.printStackTrace();
+ }
+ }
+ COMMANDS = builder.build();
+ }
+
+ private static final String USAGE = makeUsage(COMMANDS);
+
+ public static void main(final String[] args) {
+ // If no command provided or the command isn't recognized, then print the usage.
+ if (args.length == 0 || !COMMANDS.containsKey(args[0])) {
+ System.out.println(USAGE);
+ System.exit(1);
+ }
+
+ // Fetch the command that will be executed.
+ final String command = args[0];
+ final String[] commandArgs = Arrays.copyOfRange(args, 1, args.length);
+ final RyaKafkaClientCommand clientCommand = COMMANDS.get(command);
+
+ // Print usage if the arguments are invalid for the command.
+ if(!clientCommand.validArguments(commandArgs)) {
+ System.out.println(clientCommand.getUsage());
+ System.exit(1);
+ }
+
+ // Execute the command.
+ try {
+ clientCommand.execute(commandArgs);
+ } catch (ArgumentsException | ExecutionException e) {
+ System.err.println("The command: " + command + " failed to execute properly.");
+ e.printStackTrace();
+ System.exit(2);
+ }
+ }
+
+ private static String makeUsage(final ImmutableMap<String, RyaKafkaClientCommand> commands) {
+ final StringBuilder usage = new StringBuilder();
+ usage.append("Usage: ").append(CLIDriver.class.getSimpleName()).append(" <command> (<argument> ... )\n");
+ usage.append("\n");
+ usage.append("Possible Commands:\n");
+
+ // Sort and find the max width of the commands.
+ final List<String> sortedCommandNames = Lists.newArrayList(commands.keySet());
+ Collections.sort(sortedCommandNames);
+
+ int maxCommandLength = 0;
+ for (final String commandName : sortedCommandNames) {
+ maxCommandLength = commandName.length() > maxCommandLength ? commandName.length() : maxCommandLength;
+ }
+
+ // Add each command to the usage.
+ final String commandFormat = " %-" + maxCommandLength + "s - %s\n";
+ for (final String commandName : sortedCommandNames) {
+ final String commandDescription = commands.get(commandName).getDescription();
+ usage.append(String.format(commandFormat, commandName, commandDescription));
+ }
+
+ return usage.toString();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/RyaKafkaClientCommand.java
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/RyaKafkaClientCommand.java b/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/RyaKafkaClientCommand.java
new file mode 100644
index 0000000..8a69a07
--- /dev/null
+++ b/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/RyaKafkaClientCommand.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.client;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A command that may be executed by the Rya Kafka Connect Client {@link CLIDriver}.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface RyaKafkaClientCommand {
+
+ /**
+ * Command line parameters that are used by all commands that interact with Kafka.
+ */
+ class KafkaParameters {
+
+ @Parameter(names = { "--bootstrapServers", "-b" }, description =
+ "A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.")
+ public String bootstrapServers = "localhost:9092";
+
+ @Parameter(names = { "--topic", "-t" }, required = true, description = "The Kafka topic that will be interacted with.")
+ public String topic;
+ }
+
+ /**
+ * @return What a user would type into the command line to indicate
+ * they want to execute this command.
+ */
+ public String getCommand();
+
+ /**
+ * @return Briefly describes what the command does.
+ */
+ public String getDescription();
+
+ /**
+ * @return Describes what arguments may be provided to the command.
+ */
+ default public String getUsage() {
+ final JCommander parser = new JCommander(new KafkaParameters());
+
+ final StringBuilder usage = new StringBuilder();
+ parser.usage(usage);
+ return usage.toString();
+ }
+
+ /**
+ * Validates a set of arguments that may be passed into the command.
+ *
+ * @param args - The arguments that will be validated. (not null)
+ * @return {@code true} if the arguments are valid, otherwise {@code false}.
+ */
+ public boolean validArguments(String[] args);
+
+ /**
+ * Execute the command using the command line arguments.
+ *
+ * @param args - Command line arguments that configure how the command will execute. (not null)
+ * @throws ArgumentsException there was a problem with the provided arguments.
+ * @throws ExecutionException There was a problem while executing the command.
+ */
+ public void execute(final String[] args) throws ArgumentsException, ExecutionException;
+
+ /**
+ * A {@link RyaKafkaClientCommand} could not be executed because of a problem with
+ * the arguments that were provided to it.
+ */
+ public static final class ArgumentsException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public ArgumentsException(final String message) {
+ super(message);
+ }
+
+ public ArgumentsException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * A {@link RyaKafkaClientCommand} could not be executed.
+ */
+ public static final class ExecutionException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public ExecutionException(final String message) {
+ super(message);
+ }
+
+ public ExecutionException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/ReadStatementsCommand.java
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/ReadStatementsCommand.java b/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/ReadStatementsCommand.java
new file mode 100644
index 0000000..bf7a647
--- /dev/null
+++ b/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/ReadStatementsCommand.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.client.command;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collections;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.rya.kafka.connect.api.StatementsDeserializer;
+import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand;
+import org.eclipse.rdf4j.model.Statement;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.ParameterException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Reads {@link Statement}s from a Kafka topic using the Rya Kafka Connect Sink format.
+ */
+@DefaultAnnotation(NonNull.class)
+public class ReadStatementsCommand implements RyaKafkaClientCommand {
+
+ @Override
+ public String getCommand() {
+ return "read";
+ }
+
+ @Override
+ public String getDescription() {
+ return "Reads Statements from the specified Kafka topic.";
+ }
+
+ @Override
+ public boolean validArguments(final String[] args) {
+ boolean valid = true;
+ try {
+ new JCommander(new KafkaParameters(), args);
+ } catch(final ParameterException e) {
+ valid = false;
+ }
+ return valid;
+ }
+
+ @Override
+ public void execute(final String[] args) throws ArgumentsException, ExecutionException {
+ requireNonNull(args);
+
+ // Parse the command line arguments.
+ final KafkaParameters params = new KafkaParameters();
+ try {
+ new JCommander(params, args);
+ } catch(final ParameterException e) {
+ throw new ArgumentsException("Could not read the Statements from the topic because of invalid command line parameters.", e);
+ }
+
+ // Set up the consumer.
+ try(KafkaConsumer<String, Set<Statement>> consumer = makeConsumer(params)) {
+ // Subscribe to the configured topic.
+ consumer.subscribe(Collections.singleton(params.topic));
+
+ // Read the statements and write them to output.
+ for(final ConsumerRecord<String, Set<Statement>> record : consumer.poll(500)) {
+ for(final Statement stmt: record.value()) {
+ System.out.println( stmt );
+ }
+ }
+ }
+ }
+
+ private KafkaConsumer<String, Set<Statement>> makeConsumer(final KafkaParameters params) {
+ requireNonNull(params);
+
+ // Configure which instance of Kafka to connect to.
+ final Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, params.bootstrapServers);
+
+ // Nothing meaningful is in the key and the values is a Set<BindingSet> object.
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StatementsDeserializer.class);
+
+ // Use a UUID for the Group Id so that we never register as part of the same group as another consumer.
+ final String groupId = UUID.randomUUID().toString();
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+
+ // Set a client id so that server side logging can be traced.
+ props.put(ConsumerConfig.CLIENT_ID_CONFIG, "Kafka-Connect-Client-" + groupId);
+
+ // These consumers always start at the beginning and move forwards until the caller is finished with
+ // the returned stream, so never commit the consumer's progress.
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+ return new KafkaConsumer<>(props);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/WriteStatementsCommand.java
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/WriteStatementsCommand.java b/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/WriteStatementsCommand.java
new file mode 100644
index 0000000..83311f5
--- /dev/null
+++ b/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/WriteStatementsCommand.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.client.command;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.kafka.connect.api.StatementsSerializer;
+import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand;
+import org.apache.rya.rdftriplestore.utils.RdfFormatUtils;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFFormat;
+import org.eclipse.rdf4j.rio.RDFHandlerException;
+import org.eclipse.rdf4j.rio.RDFParseException;
+import org.eclipse.rdf4j.rio.RDFParser;
+import org.eclipse.rdf4j.rio.Rio;
+import org.eclipse.rdf4j.rio.UnsupportedRDFormatException;
+import org.eclipse.rdf4j.rio.helpers.AbstractRDFHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Writes {@link Statement}s to a Kafka topic using the Rya Kafka Connect Sink format.
+ */
+@DefaultAnnotation(NonNull.class)
+public class WriteStatementsCommand implements RyaKafkaClientCommand {
+ private static final Logger log = LoggerFactory.getLogger(WriteStatementsCommand.class);
+
+ /**
+ * Command line parameters that are used by this command to configure itself.
+ */
+ public static class WriteParameters extends KafkaParameters {
+ @Parameter(names = {"--statementsFile", "-f"}, required = true, description = "The file of RDF statements to load into Rya Streams.")
+ public String statementsFile;
+ }
+
+ @Override
+ public String getCommand() {
+ return "write";
+ }
+
+ @Override
+ public String getDescription() {
+ return "Writes Statements to the specified Kafka topic.";
+ }
+
+ @Override
+ public boolean validArguments(final String[] args) {
+ boolean valid = true;
+ try {
+ new JCommander(new WriteParameters(), args);
+ } catch(final ParameterException e) {
+ valid = false;
+ }
+ return valid;
+ }
+
+ /**
+ * @return Describes what arguments may be provided to the command.
+ */
+ @Override
+ public String getUsage() {
+ final JCommander parser = new JCommander(new WriteParameters());
+
+ final StringBuilder usage = new StringBuilder();
+ parser.usage(usage);
+ return usage.toString();
+ }
+
+ @Override
+ public void execute(final String[] args) throws ArgumentsException, ExecutionException {
+ requireNonNull(args);
+
+ // Parse the command line arguments.
+ final WriteParameters params = new WriteParameters();
+ try {
+ new JCommander(params, args);
+ } catch(final ParameterException e) {
+ throw new ArgumentsException("Could not stream the query's results because of invalid command line parameters.", e);
+ }
+
+ // Verify the configured statements file path.
+ final Path statementsPath = Paths.get(params.statementsFile);
+ if(!statementsPath.toFile().exists()) {
+ throw new ArgumentsException("Could not load statements at path '" + statementsPath + "' because that " +
+ "file does not exist. Make sure you've entered the correct path.");
+ }
+
+ // Create an RDF Parser whose format is derived from the statementPath's file extension.
+ final String filename = statementsPath.getFileName().toString();
+ final RDFFormat format = RdfFormatUtils.forFileName(filename);
+ if (format == null) {
+ throw new UnsupportedRDFormatException("Unknown RDF format for the file: " + filename);
+ }
+ final RDFParser parser = Rio.createParser(format);
+
+ // Set up the producer.
+ try(Producer<String, Set<Statement>> producer = makeProducer(params)) {
+ // Set a handler that writes the statements to the specified kafka topic. It writes batches of 5 Statements.
+ parser.setRDFHandler(new AbstractRDFHandler() {
+
+ private Set<Statement> batch = new HashSet<>(5);
+
+ @Override
+ public void startRDF() throws RDFHandlerException {
+ log.trace("Starting loading statements.");
+ }
+
+ @Override
+ public void handleStatement(final Statement stmnt) throws RDFHandlerException {
+ log.trace("Adding statement.");
+ batch.add(stmnt);
+
+ if(batch.size() == 5) {
+ flushBatch();
+ }
+ }
+
+ @Override
+ public void endRDF() throws RDFHandlerException {
+ if(!batch.isEmpty()) {
+ flushBatch();
+ }
+ log.trace("Done.");
+ }
+
+ private void flushBatch() {
+ log.trace("Flushing batch of size " + batch.size());
+ producer.send(new ProducerRecord<>(params.topic, null, batch));
+ batch = new HashSet<>(5);
+ producer.flush();
+ }
+ });
+
+ // Do the parse and load.
+ try {
+ parser.parse(Files.newInputStream(statementsPath), "");
+ } catch (RDFParseException | RDFHandlerException | IOException e) {
+ throw new ExecutionException("Could not load the RDF file's Statements into the Kafka topic.", e);
+ }
+ }
+ }
+
+ private static Producer<String, Set<Statement>> makeProducer(final KafkaParameters params) {
+ requireNonNull(params);
+ final Properties props = new Properties();
+ props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, params.bootstrapServers);
+ props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StatementsSerializer.class.getName());
+ return new KafkaProducer<>(props);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/client/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/client/src/main/resources/log4j.properties b/extras/kafka.connect/client/src/main/resources/log4j.properties
new file mode 100644
index 0000000..b07468c
--- /dev/null
+++ b/extras/kafka.connect/client/src/main/resources/log4j.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Root logger option
+log4j.rootLogger=INFO, stdout
+
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/mongo-it/README.md
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/mongo-it/README.md b/extras/kafka.connect/mongo-it/README.md
new file mode 100644
index 0000000..b154b95
--- /dev/null
+++ b/extras/kafka.connect/mongo-it/README.md
@@ -0,0 +1,19 @@
+<!-- Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. -->
+
+This project contains integration tests that verify a Mongo DB backed
+implementation of the Rya Kafka Connect Sink is working properly.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/mongo-it/pom.xml
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/mongo-it/pom.xml b/extras/kafka.connect/mongo-it/pom.xml
new file mode 100644
index 0000000..ca439ea
--- /dev/null
+++ b/extras/kafka.connect/mongo-it/pom.xml
@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.kafka.connect.parent</artifactId>
+ <version>4.0.0-incubating-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>rya.kafka.connect.mongo.it</artifactId>
+
+ <name>Apache Rya Kafka Connect - Mongo DB Integration Tests</name>
+ <description>Tests the Kafka Connect Sink that writes to a Rya instance backed by Mongo DB.</description>
+
+ <dependencies>
+ <!-- 1st party dependencies. -->
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.kafka.connect.mongo</artifactId>
+ </dependency>
+
+ <!-- 3rd party dependencies. -->
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>connect-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Testing dependencies. -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.test.mongo</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/mongo-it/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTaskIT.java
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/mongo-it/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTaskIT.java b/extras/kafka.connect/mongo-it/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTaskIT.java
new file mode 100644
index 0000000..55e7603
--- /dev/null
+++ b/extras/kafka.connect/mongo-it/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTaskIT.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.mongo.MongoConnectionDetails;
+import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
+import org.apache.rya.test.mongo.MongoITBase;
+import org.junit.Test;
+
+/**
+ * Integration tests the methods of {@link MongoRyaSinkTask}.
+ */
+public class MongoRyaSinkTaskIT extends MongoITBase {
+
+ @Test
+ public void instanceExists() throws Exception {
+ // Install an instance of Rya.
+ final String ryaInstanceName = "rya";
+ final MongoConnectionDetails connectionDetails = new MongoConnectionDetails(
+ super.getMongoHostname(),
+ super.getMongoPort(),
+ Optional.empty(),
+ Optional.empty());
+
+ final InstallConfiguration installConfig = InstallConfiguration.builder()
+ .setEnableTableHashPrefix(false)
+ .setEnableEntityCentricIndex(false)
+ .setEnableFreeTextIndex(false)
+ .setEnableTemporalIndex(false)
+ .setEnablePcjIndex(false)
+ .setEnableGeoIndex(false)
+ .build();
+
+ final RyaClient ryaClient = MongoRyaClientFactory.build(connectionDetails, super.getMongoClient());
+ ryaClient.getInstall().install(ryaInstanceName, installConfig);
+
+ // Create the task that will be tested.
+ final MongoRyaSinkTask task = new MongoRyaSinkTask();
+
+ try {
+ // Configure the task to use the embedded Mongo DB instance for Rya.
+ final Map<String, String> config = new HashMap<>();
+ config.put(MongoRyaSinkConfig.HOSTNAME, super.getMongoHostname());
+ config.put(MongoRyaSinkConfig.PORT, "" + super.getMongoPort());
+ config.put(MongoRyaSinkConfig.RYA_INSTANCE_NAME, "rya");
+
+ // This will pass because the Rya instance exists.
+ task.start(config);
+ } finally {
+ task.stop();
+ }
+ }
+
+ @Test(expected = ConnectException.class)
+ public void instanceDoesNotExist() throws Exception {
+ // Create the task that will be tested.
+ final MongoRyaSinkTask task = new MongoRyaSinkTask();
+
+ try {
+ // Configure the task to use the embedded Mongo DB instance for Rya.
+ final Map<String, String> config = new HashMap<>();
+ config.put(MongoRyaSinkConfig.HOSTNAME, super.getMongoHostname());
+ config.put(MongoRyaSinkConfig.PORT, "" + super.getMongoPort());
+ config.put(MongoRyaSinkConfig.RYA_INSTANCE_NAME, "instance-does-not-exist");
+
+ // Starting the task will fail because the Rya instance does not exist.
+ task.start(config);
+ } finally {
+ task.stop();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/mongo/README.md
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/mongo/README.md b/extras/kafka.connect/mongo/README.md
new file mode 100644
index 0000000..03b2c9b
--- /dev/null
+++ b/extras/kafka.connect/mongo/README.md
@@ -0,0 +1,23 @@
+<!-- Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. -->
+
+This project is the Rya Kafka Connect Sink that writes to Mongo DB backed
+instances of Rya.
+
+This project produces a shaded jar that may be installed into Kafka Connect.
+For more information about how to install and configure this connector, see
+[the manual](../../rya.manual/src/site/markdown/kafka-connect-integration.md).
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/mongo/pom.xml
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/mongo/pom.xml b/extras/kafka.connect/mongo/pom.xml
new file mode 100644
index 0000000..66eba1b
--- /dev/null
+++ b/extras/kafka.connect/mongo/pom.xml
@@ -0,0 +1,79 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.kafka.connect.parent</artifactId>
+ <version>4.0.0-incubating-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>rya.kafka.connect.mongo</artifactId>
+
+ <name>Apache Rya Kafka Connect - Mongo DB</name>
+ <description>A Kafka Connect Sink that writes to a Rya instance backed by Mongo DB.</description>
+
+ <dependencies>
+ <!-- 1st party dependencies. -->
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.kafka.connect.api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.indexing</artifactId>
+ </dependency>
+
+ <!-- 3rd party dependencies. -->
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>connect-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Testing dependencies. -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!-- Build the uber jar that may be deployed to Kafka Connect. -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfig.java
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfig.java b/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfig.java
new file mode 100644
index 0000000..3b48556
--- /dev/null
+++ b/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfig.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka Connect configuration that is used to configure {@link MongoRyaSinkConnector}s and {@link MongoRyaSinkTask}s.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoRyaSinkConfig extends RyaSinkConfig {
+
+ public static final String HOSTNAME = "mongo.hostname";
+ private static final String HOSTNAME_DOC = "The Mongo DB hostname the Sail connections will use.";
+
+ public static final String PORT = "mongo.port";
+ private static final String PORT_DOC = "The Mongo DB port the Sail connections will use.";
+
+ public static final String USERNAME = "mongo.username";
+ private static final String USERNAME_DOC = "The Mongo DB username the Sail connections will use.";
+
+ public static final String PASSWORD = "mongo.password";
+ private static final String PASSWORD_DOC = "The Mongo DB password the Sail connections will use.";
+
+ public static final ConfigDef CONFIG_DEF = new ConfigDef()
+ .define(HOSTNAME, Type.STRING, Importance.HIGH, HOSTNAME_DOC)
+ .define(PORT, Type.INT, Importance.HIGH, PORT_DOC)
+ .define(USERNAME, Type.STRING, "", Importance.HIGH, USERNAME_DOC)
+ .define(PASSWORD, Type.PASSWORD, "", Importance.HIGH, PASSWORD_DOC);
+ static {
+ RyaSinkConfig.addCommonDefinitions(CONFIG_DEF);
+ }
+
+ /**
+ * Constructs an instance of {@link MongoRyaSinkConfig}.
+ *
+ * @param originals - The key/value pairs that define the configuration. (not null)
+ */
+ public MongoRyaSinkConfig(final Map<?, ?> originals) {
+ super(CONFIG_DEF, originals);
+ }
+
+ /**
+ * @return The Mongo DB hostname the Sail connections wlll use.
+ */
+ public String getHostname() {
+ return super.getString(HOSTNAME);
+ }
+
+ /**
+ * @return The Mongo DB port the Sail connections will use.
+ */
+ public int getPort() {
+ return super.getInt(PORT);
+ }
+
+ /**
+ * @return The Mongo DB username the Sail connections will use.
+ */
+ public String getUsername() {
+ return super.getString(USERNAME);
+ }
+
+ /**
+ * @return The Mongo DB password the Sail connections will use.
+ */
+ public String getPassword() {
+ return super.getPassword(PASSWORD).value();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConnector.java
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConnector.java b/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConnector.java
new file mode 100644
index 0000000..fd91d07
--- /dev/null
+++ b/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConnector.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkConnector} that uses a Mongo DB Rya backend when creating tasks.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoRyaSinkConnector extends RyaSinkConnector {
+
+ @Nullable
+ private MongoRyaSinkConfig config = null;
+
+ @Override
+ public void start(final Map<String, String> props) {
+ this.config = new MongoRyaSinkConfig( props );
+ }
+
+ @Override
+ protected AbstractConfig getConfig() {
+ if(config == null) {
+ throw new IllegalStateException("The configuration has not been set yet. Invoke start(Map) first.");
+ }
+ return config;
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return MongoRyaSinkTask.class;
+ }
+
+ @Override
+ public ConfigDef config() {
+ return MongoRyaSinkConfig.CONFIG_DEF;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java b/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java
new file mode 100644
index 0000000..6887fdb
--- /dev/null
+++ b/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.mongo.MongoConnectionDetails;
+import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
+import org.apache.rya.api.log.LogUtils;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import com.google.common.base.Strings;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoCredential;
+import com.mongodb.ServerAddress;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoRyaSinkTask extends RyaSinkTask {
+
+ @Override
+ protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws IllegalStateException {
+ requireNonNull(taskConfig);
+
+ // Parse the configuration object.
+ final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig);
+ @Nullable
+ final String username = Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername();
+ @Nullable
+ final char[] password = Strings.isNullOrEmpty(config.getPassword()) ? null : config.getPassword().toCharArray();
+
+ // Connect a Mongo Client to the configured Mongo DB instance.
+ final ServerAddress serverAddr = new ServerAddress(config.getHostname(), config.getPort());
+ final boolean hasCredentials = username != null && password != null;
+
+ try(MongoClient mongoClient = hasCredentials ?
+ new MongoClient(serverAddr, Arrays.asList(MongoCredential.createCredential(username, config.getRyaInstanceName(), password))) :
+ new MongoClient(serverAddr)) {
+ // Use a RyaClient to see if the configured instance exists.
+ // Create the Mongo Connection Details that describe the Mongo DB Server we are interacting with.
+ final MongoConnectionDetails connectionDetails = new MongoConnectionDetails(
+ config.getHostname(),
+ config.getPort(),
+ Optional.ofNullable(username),
+ Optional.ofNullable(password));
+
+ final RyaClient client = MongoRyaClientFactory.build(connectionDetails, mongoClient);
+ if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
+ throw new ConnectException("The Rya Instance named " +
+ LogUtils.clean(config.getRyaInstanceName()) + " has not been installed.");
+ }
+ } catch(final RyaClientException e) {
+ throw new ConnectException("Unable to determine if the Rya Instance named " +
+ LogUtils.clean(config.getRyaInstanceName()) + " has been installed.", e);
+ }
+ }
+
+ @Override
+ protected Sail makeSail(final Map<String, String> taskConfig) {
+ requireNonNull(taskConfig);
+
+ // Parse the configuration object.
+ final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig);
+
+ // Move the configuration into a Rya Configuration object.
+ final MongoDBRdfConfiguration ryaConfig = new MongoDBRdfConfiguration();
+ ConfigUtils.setUseMongo(ryaConfig, true);
+ ryaConfig.setMongoDBName( config.getRyaInstanceName() );
+ ryaConfig.setTablePrefix( config.getRyaInstanceName() );
+ ryaConfig.setMongoHostname( config.getHostname() );
+ ryaConfig.setMongoPort( "" + config.getPort() );
+
+ if(!Strings.isNullOrEmpty(config.getUsername()) && !Strings.isNullOrEmpty(config.getPassword())) {
+ ryaConfig.setMongoUser( config.getUsername() );
+ ryaConfig.setMongoPassword( config.getPassword() );
+ }
+
+ // Create the Sail object.
+ try {
+ return RyaSailFactory.getInstance(ryaConfig);
+ } catch (SailException | AccumuloException | AccumuloSecurityException | RyaDAOException | InferenceEngineException e) {
+ throw new ConnectException("Could not connect to the Rya Instance named " + config.getRyaInstanceName(), e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/mongo/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfigTest.java
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/mongo/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfigTest.java b/extras/kafka.connect/mongo/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfigTest.java
new file mode 100644
index 0000000..d6c7c96
--- /dev/null
+++ b/extras/kafka.connect/mongo/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfigTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig;
+import org.junit.Test;
+
+/**
+ * Unit tests the methods of {@link MongoRyaSinkConfig}.
+ */
+public class MongoRyaSinkConfigTest {
+
+ @Test
+ public void parses() {
+ final Map<String, String> properties = new HashMap<>();
+ properties.put(MongoRyaSinkConfig.HOSTNAME, "127.0.0.1");
+ properties.put(MongoRyaSinkConfig.PORT, "27017");
+ properties.put(MongoRyaSinkConfig.USERNAME, "alice");
+ properties.put(MongoRyaSinkConfig.PASSWORD, "alice1234!@");
+ properties.put(RyaSinkConfig.RYA_INSTANCE_NAME, "rya");
+ new MongoRyaSinkConfig(properties);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/pom.xml
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/pom.xml b/extras/kafka.connect/pom.xml
new file mode 100644
index 0000000..9a9702c
--- /dev/null
+++ b/extras/kafka.connect/pom.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.extras</artifactId>
+ <version>4.0.0-incubating-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>rya.kafka.connect.parent</artifactId>
+
+ <name>Apache Rya Kafka Connect Parent</name>
+ <description>The parent pom file for any Rya Kafka Connect project.</description>
+
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>api</module>
+ <module>accumulo</module>
+ <module>accumulo-it</module>
+ <module>mongo</module>
+ <module>mongo-it</module>
+ <module>client</module>
+ </modules>
+
+ <properties>
+ <kafka.version>1.1.0</kafka.version>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <archive>
+ <manifestEntries>
+ <Build-Version>${project.version}</Build-Version>
+ </manifestEntries>
+ </archive>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/pom.xml
----------------------------------------------------------------------
diff --git a/extras/pom.xml b/extras/pom.xml
index bb6f914..65dd4cc 100644
--- a/extras/pom.xml
+++ b/extras/pom.xml
@@ -45,6 +45,7 @@ under the License.
<module>rya.merger</module>
<module>rya.streams</module>
<module>rya.forwardchain</module>
+ <module>kafka.connect</module>
</modules>
<profiles>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/rya.manual/src/site/markdown/_index.md
----------------------------------------------------------------------
diff --git a/extras/rya.manual/src/site/markdown/_index.md b/extras/rya.manual/src/site/markdown/_index.md
index d0b61c4..07dfe50 100644
--- a/extras/rya.manual/src/site/markdown/_index.md
+++ b/extras/rya.manual/src/site/markdown/_index.md
@@ -33,6 +33,7 @@ This project contains documentation about Apache Rya, a scalable RDF triple stor
- [Inferencing](infer.md)
- [MapReduce Interface](mapreduce.md)
- [Rya Streams](rya-streams.md)
+- [Kafka Connect Integration](kafka-connect-integration.md)
# Samples
- [Typical First Steps](sm-firststeps.md)
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/rya.manual/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git a/extras/rya.manual/src/site/markdown/index.md b/extras/rya.manual/src/site/markdown/index.md
index 5372618..e686736 100644
--- a/extras/rya.manual/src/site/markdown/index.md
+++ b/extras/rya.manual/src/site/markdown/index.md
@@ -35,6 +35,7 @@ This project contains documentation about Apache Rya, a scalable RDF triple stor
- [Shell Interface](shell.md)
- [Incremental Join Maintenance Application (PCJ Updater)](pcj-updater.md)
- [Rya Streams](rya-streams.md)
+- [Kafka Connect Integration](kafka-connect-integration.md)
# Samples
- [Typical First Steps](sm-firststeps.md)