You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2018/05/16 15:53:21 UTC

[3/3] incubator-rya git commit: RYA-487 Closes #296, Implement Kafka Connect Sink implementations for Accumulo and Mongo DB backed Rya.

RYA-487 Closes #296, Implement Kafka Connect Sink implementations for Accumulo and Mongo DB backed Rya.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/af736749
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/af736749
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/af736749

Branch: refs/heads/master
Commit: af736749a375c54fc09efddbc749395f3e743937
Parents: ea91e26
Author: kchilton2 <ke...@gmail.com>
Authored: Tue Apr 17 15:10:26 2018 -0400
Committer: Valiyil <Pu...@parsons.com>
Committed: Wed May 16 11:52:55 2018 -0400

----------------------------------------------------------------------
 .../rya/accumulo/AccumuloRdfConfiguration.java  |  96 ++--
 .../rya/mongodb/MongoDBRdfConfiguration.java    |   8 +-
 .../rya/indexing/accumulo/ConfigUtils.java      |   3 +
 extras/kafka.connect/README.md                  |  22 +
 extras/kafka.connect/accumulo-it/README.md      |  19 +
 extras/kafka.connect/accumulo-it/pom.xml        |  62 +++
 .../connect/accumulo/AccumuloRyaSinkTaskIT.java | 100 ++++
 extras/kafka.connect/accumulo/README.md         |  23 +
 extras/kafka.connect/accumulo/pom.xml           |  79 +++
 .../connect/accumulo/AccumuloRyaSinkConfig.java |  97 ++++
 .../accumulo/AccumuloRyaSinkConnector.java      |  66 +++
 .../connect/accumulo/AccumuloRyaSinkTask.java   | 112 +++++
 .../accumulo/AccumuloRyaSinkConfigTest.java     |  42 ++
 extras/kafka.connect/api/README.md              |  20 +
 extras/kafka.connect/api/pom.xml                |  96 ++++
 .../kafka/connect/api/StatementsConverter.java  |  62 +++
 .../connect/api/StatementsDeserializer.java     |  87 ++++
 .../rya/kafka/connect/api/StatementsSerde.java  |  57 +++
 .../kafka/connect/api/StatementsSerializer.java |  77 +++
 .../kafka/connect/api/sink/RyaSinkConfig.java   |  67 +++
 .../connect/api/sink/RyaSinkConnector.java      |  69 +++
 .../rya/kafka/connect/api/sink/RyaSinkTask.java | 145 ++++++
 .../kafka/connect/api/StatementsSerdeTest.java  |  84 ++++
 .../kafka/connect/api/sink/RyaSinkTaskTest.java | 264 ++++++++++
 .../src/test/resources/simplelogger.properties  |  17 +
 extras/kafka.connect/client/README.md           |  21 +
 extras/kafka.connect/client/pom.xml             | 113 +++++
 .../rya/kafka/connect/client/CLIDriver.java     | 121 +++++
 .../connect/client/RyaKafkaClientCommand.java   | 115 +++++
 .../client/command/ReadStatementsCommand.java   | 120 +++++
 .../client/command/WriteStatementsCommand.java  | 187 +++++++
 .../client/src/main/resources/log4j.properties  |  27 +
 extras/kafka.connect/mongo-it/README.md         |  19 +
 extras/kafka.connect/mongo-it/pom.xml           |  62 +++
 .../kafka/connect/mongo/MongoRyaSinkTaskIT.java |  95 ++++
 extras/kafka.connect/mongo/README.md            |  23 +
 extras/kafka.connect/mongo/pom.xml              |  79 +++
 .../kafka/connect/mongo/MongoRyaSinkConfig.java |  94 ++++
 .../connect/mongo/MongoRyaSinkConnector.java    |  63 +++
 .../kafka/connect/mongo/MongoRyaSinkTask.java   | 123 +++++
 .../connect/mongo/MongoRyaSinkConfigTest.java   |  42 ++
 extras/kafka.connect/pom.xml                    |  66 +++
 extras/pom.xml                                  |   1 +
 extras/rya.manual/src/site/markdown/_index.md   |   1 +
 extras/rya.manual/src/site/markdown/index.md    |   1 +
 .../site/markdown/kafka-connect-integration.md  | 493 +++++++++++++++++++
 extras/rya.manual/src/site/site.xml             |   3 +-
 pom.xml                                         |  52 +-
 48 files changed, 3637 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java
index ed76b4a..cbfe2ea 100644
--- a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java
+++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java
@@ -62,14 +62,14 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration {
         super();
     }
 
-    public AccumuloRdfConfiguration(Configuration other) {
+    public AccumuloRdfConfiguration(final Configuration other) {
         super(other);
     }
 
-    public AccumuloRdfConfigurationBuilder getBuilder() {
+    public static AccumuloRdfConfigurationBuilder getBuilder() {
     	return new AccumuloRdfConfigurationBuilder();
     }
-    
+
     /**
      * Creates an AccumuloRdfConfiguration object from a Properties file.  This method assumes
      * that all values in the Properties file are Strings and that the Properties file uses the keys below.
@@ -94,26 +94,26 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration {
      * @param props - Properties file containing Accumulo specific configuration parameters
      * @return AccumumuloRdfConfiguration with properties set
      */
-    
-    public static AccumuloRdfConfiguration fromProperties(Properties props) {
+
+    public static AccumuloRdfConfiguration fromProperties(final Properties props) {
     	return AccumuloRdfConfigurationBuilder.fromProperties(props).build();
     }
-    
+
     @Override
     public AccumuloRdfConfiguration clone() {
         return new AccumuloRdfConfiguration(this);
     }
-    
+
     /**
      * Sets the Accumulo username from the configuration object that is meant to
      * be used when connecting a {@link Connector} to Accumulo.
      *
      */
-    public void setAccumuloUser(String user) {
+    public void setAccumuloUser(final String user) {
     	Preconditions.checkNotNull(user);
     	set(CLOUDBASE_USER, user);
     }
-    
+
     /**
      * Get the Accumulo username from the configuration object that is meant to
      * be used when connecting a {@link Connector} to Accumulo.
@@ -121,19 +121,19 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration {
      * @return The username if one could be found; otherwise {@code null}.
      */
     public String getAccumuloUser(){
-    	return get(CLOUDBASE_USER); 
+    	return get(CLOUDBASE_USER);
     }
-    
+
     /**
      * Sets the Accumulo password from the configuration object that is meant to
      * be used when connecting a {@link Connector} to Accumulo.
      *
      */
-    public void setAccumuloPassword(String password) {
+    public void setAccumuloPassword(final String password) {
     	Preconditions.checkNotNull(password);
     	set(CLOUDBASE_PASSWORD, password);
     }
-    
+
     /**
      * Get the Accumulo password from the configuration object that is meant to
      * be used when connecting a {@link Connector} to Accumulo.
@@ -143,18 +143,18 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration {
     public String getAccumuloPassword() {
     	return get(CLOUDBASE_PASSWORD);
     }
-    
+
     /**
      * Sets a comma delimited list of the names of the Zookeeper servers from
      * the configuration object that is meant to be used when connecting a
      * {@link Connector} to Accumulo.
      *
      */
-    public void setAccumuloZookeepers(String zookeepers) {
+    public void setAccumuloZookeepers(final String zookeepers) {
     	Preconditions.checkNotNull(zookeepers);
     	set(CLOUDBASE_ZOOKEEPERS, zookeepers);
     }
-    
+
     /**
      * Get a comma delimited list of the names of the Zookeeper servers from
      * the configuration object that is meant to be used when connecting a
@@ -165,17 +165,17 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration {
     public String getAccumuloZookeepers() {
     	return get(CLOUDBASE_ZOOKEEPERS);
     }
-    
+
     /**
      * Sets the Accumulo instance name from the configuration object that is
      * meant to be used when connecting a {@link Connector} to Accumulo.
      *
      */
-    public void setAccumuloInstance(String instance) {
+    public void setAccumuloInstance(final String instance) {
     	Preconditions.checkNotNull(instance);
     	set(CLOUDBASE_INSTANCE, instance);
     }
-    
+
     /**
      * Get the Accumulo instance name from the configuration object that is
      * meant to be used when connecting a {@link Connector} to Accumulo.
@@ -185,15 +185,15 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration {
     public String getAccumuloInstance() {
     	return get(CLOUDBASE_INSTANCE);
     }
-    
+
     /**
      * Tells the Rya instance to use a Mock instance of Accumulo as its backing.
      *
      */
-    public void setUseMockAccumulo(boolean useMock) {
+    public void setUseMockAccumulo(final boolean useMock) {
     	setBoolean(USE_MOCK_INSTANCE, useMock);
     }
-    
+
     /**
      * Indicates that a Mock instance of Accumulo is being used to back the Rya instance.
      *
@@ -202,12 +202,12 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration {
     public boolean getUseMockAccumulo() {
     	return getBoolean(USE_MOCK_INSTANCE, false);
     }
-    
+
 
     /**
      * @param enabled - {@code true} if the Rya instance is backed by a mock Accumulo; otherwise {@code false}.
      */
-    public void useMockInstance(boolean enabled) {
+    public void useMockInstance(final boolean enabled) {
         super.setBooleanIfUnset(USE_MOCK_INSTANCE, enabled);
     }
 
@@ -224,7 +224,7 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration {
      * @param username - The Accumulo username from the configuration object that is meant to
      *   be used when connecting a {@link Connector} to Accumulo.
      */
-    public void setUsername(String username) {
+    public void setUsername(final String username) {
         super.set(CLOUDBASE_USER, username);
     }
 
@@ -242,7 +242,7 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration {
      * @param password - The Accumulo password from the configuration object that is meant to
      * be used when connecting a {@link Connector} to Accumulo.
      */
-    public void setPassword(String password) {
+    public void setPassword(final String password) {
         super.set(CLOUDBASE_PASSWORD, password);
     }
 
@@ -260,7 +260,7 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration {
      * @param instanceName - The Accumulo instance name from the configuration object that is
      * meant to be used when connecting a {@link Connector} to Accumulo.
      */
-    public void setInstanceName(String instanceName) {
+    public void setInstanceName(final String instanceName) {
         super.set(CLOUDBASE_INSTANCE, instanceName);
     }
 
@@ -279,7 +279,7 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration {
      * the configuration object that is meant to be used when connecting a
      * {@link Connector} to Accumulo.
      */
-    public void setZookeepers(String zookeepers) {
+    public void setZookeepers(final String zookeepers) {
         super.set(CLOUDBASE_ZOOKEEPERS, zookeepers);
     }
 
@@ -295,14 +295,14 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration {
     }
 
     public Authorizations getAuthorizations() {
-        String[] auths = getAuths();
+        final String[] auths = getAuths();
         if (auths == null || auths.length == 0) {
             return AccumuloRdfConstants.ALL_AUTHORIZATIONS;
         }
         return new Authorizations(auths);
     }
 
-    public void setMaxRangesForScanner(Integer max) {
+    public void setMaxRangesForScanner(final Integer max) {
         setInt(MAXRANGES_SCANNER, max);
     }
 
@@ -310,9 +310,9 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration {
         return getInt(MAXRANGES_SCANNER, 2);
     }
 
-    public void setAdditionalIndexers(Class<? extends AccumuloIndexer>... indexers) {
-        List<String> strs = Lists.newArrayList();
-        for (Class<? extends AccumuloIndexer> ai : indexers){
+    public void setAdditionalIndexers(final Class<? extends AccumuloIndexer>... indexers) {
+        final List<String> strs = Lists.newArrayList();
+        for (final Class<? extends AccumuloIndexer> ai : indexers){
             strs.add(ai.getName());
         }
 
@@ -326,25 +326,25 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration {
         return getBoolean(CONF_FLUSH_EACH_UPDATE, true);
     }
 
-    public void setFlush(boolean flush){
+    public void setFlush(final boolean flush){
         setBoolean(CONF_FLUSH_EACH_UPDATE, flush);
     }
 
-    public void setAdditionalIterators(IteratorSetting... additionalIterators){
+    public void setAdditionalIterators(final IteratorSetting... additionalIterators){
         //TODO do we need to worry about cleaning up
         this.set(ITERATOR_SETTINGS_SIZE, Integer.toString(additionalIterators.length));
         int i = 0;
-        for(IteratorSetting iterator : additionalIterators) {
+        for(final IteratorSetting iterator : additionalIterators) {
             this.set(String.format(ITERATOR_SETTINGS_NAME, i), iterator.getName());
             this.set(String.format(ITERATOR_SETTINGS_CLASS, i), iterator.getIteratorClass());
             this.set(String.format(ITERATOR_SETTINGS_PRIORITY, i), Integer.toString(iterator.getPriority()));
-            Map<String, String> options = iterator.getOptions();
+            final Map<String, String> options = iterator.getOptions();
 
             this.set(String.format(ITERATOR_SETTINGS_OPTIONS_SIZE, i), Integer.toString(options.size()));
-            Iterator<Entry<String, String>> it = options.entrySet().iterator();
+            final Iterator<Entry<String, String>> it = options.entrySet().iterator();
             int j = 0;
             while(it.hasNext()) {
-                Entry<String, String> item = it.next();
+                final Entry<String, String> item = it.next();
                 this.set(String.format(ITERATOR_SETTINGS_OPTIONS_KEY, i, j), item.getKey());
                 this.set(String.format(ITERATOR_SETTINGS_OPTIONS_VALUE, i, j), item.getValue());
                 j++;
@@ -354,22 +354,22 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration {
     }
 
     public IteratorSetting[] getAdditionalIterators(){
-        int size = Integer.valueOf(this.get(ITERATOR_SETTINGS_SIZE, "0"));
+        final int size = Integer.valueOf(this.get(ITERATOR_SETTINGS_SIZE, "0"));
         if(size == 0) {
             return new IteratorSetting[0];
         }
 
-        IteratorSetting[] settings = new IteratorSetting[size];
+        final IteratorSetting[] settings = new IteratorSetting[size];
         for(int i = 0; i < size; i++) {
-            String name = this.get(String.format(ITERATOR_SETTINGS_NAME, i));
-            String iteratorClass = this.get(String.format(ITERATOR_SETTINGS_CLASS, i));
-            int priority = Integer.valueOf(this.get(String.format(ITERATOR_SETTINGS_PRIORITY, i)));
+            final String name = this.get(String.format(ITERATOR_SETTINGS_NAME, i));
+            final String iteratorClass = this.get(String.format(ITERATOR_SETTINGS_CLASS, i));
+            final int priority = Integer.valueOf(this.get(String.format(ITERATOR_SETTINGS_PRIORITY, i)));
 
-            int optionsSize = Integer.valueOf(this.get(String.format(ITERATOR_SETTINGS_OPTIONS_SIZE, i)));
-            Map<String, String> options = new HashMap<>(optionsSize);
+            final int optionsSize = Integer.valueOf(this.get(String.format(ITERATOR_SETTINGS_OPTIONS_SIZE, i)));
+            final Map<String, String> options = new HashMap<>(optionsSize);
             for(int j = 0; j < optionsSize; j++) {
-                String key = this.get(String.format(ITERATOR_SETTINGS_OPTIONS_KEY, i, j));
-                String value = this.get(String.format(ITERATOR_SETTINGS_OPTIONS_VALUE, i, j));
+                final String key = this.get(String.format(ITERATOR_SETTINGS_OPTIONS_KEY, i, j));
+                final String value = this.get(String.format(ITERATOR_SETTINGS_OPTIONS_VALUE, i, j));
                 options.put(key, value);
             }
             settings[i] = new IteratorSetting(priority, name, iteratorClass, options);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
index d49f2ee..b207d79 100644
--- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
@@ -274,17 +274,17 @@ public class MongoDBRdfConfiguration extends RdfCloudTripleStoreConfiguration {
      * on their child subtrees.
      * @param value whether to use aggregation pipeline optimization.
      */
-    public void setUseAggregationPipeline(boolean value) {
+    public void setUseAggregationPipeline(final boolean value) {
         setBoolean(USE_AGGREGATION_PIPELINE, value);
     }
 
     @Override
     public List<Class<QueryOptimizer>> getOptimizers() {
-        List<Class<QueryOptimizer>> optimizers = super.getOptimizers();
+        final List<Class<QueryOptimizer>> optimizers = super.getOptimizers();
         if (getUseAggregationPipeline()) {
-            Class<?> cl = AggregationPipelineQueryOptimizer.class;
+            final Class<?> cl = AggregationPipelineQueryOptimizer.class;
             @SuppressWarnings("unchecked")
-            Class<QueryOptimizer> optCl = (Class<QueryOptimizer>) cl;
+            final Class<QueryOptimizer> optCl = (Class<QueryOptimizer>) cl;
             optimizers.add(optCl);
         }
         return optimizers;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
index d2fe58a..77c77cd 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
@@ -438,6 +438,9 @@ public class ConfigUtils {
         return Optional.fromNullable(conf.get(FLUO_APP_NAME));
     }
 
+    public static void setUseMongo(final Configuration conf, final boolean useMongo) {
+        conf.setBoolean(USE_MONGO, useMongo);
+    }
 
     public static boolean getUseMongo(final Configuration conf) {
         return conf.getBoolean(USE_MONGO, false);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/README.md
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/README.md b/extras/kafka.connect/README.md
new file mode 100644
index 0000000..03b63c2
--- /dev/null
+++ b/extras/kafka.connect/README.md
@@ -0,0 +1,22 @@
+<!-- Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License. -->
+
+The parent project for all Rya Kafka Connect work. All projects that are part 
+of that system must use this project's pom as their parent pom.
+
+For more information about the Rya's Kafka Connect support, see 
+[the manual](../rya.manual/src/site/markdown/kafka-connect-integration.md). 
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/accumulo-it/README.md
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/accumulo-it/README.md b/extras/kafka.connect/accumulo-it/README.md
new file mode 100644
index 0000000..abcc12d
--- /dev/null
+++ b/extras/kafka.connect/accumulo-it/README.md
@@ -0,0 +1,19 @@
+<!-- Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License. -->
+
+This project contains integration tests that verify an Accumulo backed 
+implementation of the Rya Kafka Connect Sink is working properly.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/accumulo-it/pom.xml
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/accumulo-it/pom.xml b/extras/kafka.connect/accumulo-it/pom.xml
new file mode 100644
index 0000000..af088a9
--- /dev/null
+++ b/extras/kafka.connect/accumulo-it/pom.xml
@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" 
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    
+    <parent>
+        <groupId>org.apache.rya</groupId>
+        <artifactId>rya.kafka.connect.parent</artifactId>
+        <version>4.0.0-incubating-SNAPSHOT</version>
+    </parent>
+    
+    <artifactId>rya.kafka.connect.accumulo.it</artifactId>
+
+    <name>Apache Rya Kafka Connect - Accumulo Integration Tests</name>
+    <description>Tests the Kafka Connect Sink that writes to a Rya instance backed by Accumulo.</description>
+    
+    <dependencies>
+        <!-- 1st party dependencies. -->
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.kafka.connect.accumulo</artifactId>
+        </dependency>
+        
+        <!-- 3rd party dependencies. -->
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>connect-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        
+        <!-- Testing dependencies. -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.test.accumulo</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/accumulo-it/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTaskIT.java
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/accumulo-it/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTaskIT.java b/extras/kafka.connect/accumulo-it/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTaskIT.java
new file mode 100644
index 0000000..1775a74
--- /dev/null
+++ b/extras/kafka.connect/accumulo-it/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTaskIT.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.test.accumulo.AccumuloITBase;
+import org.junit.Test;
+
+/**
+ * Integration tests for the methods of {@link AccumuloRyaSinkTask}.
+ */
+public class AccumuloRyaSinkTaskIT extends AccumuloITBase {
+
+    @Test
+    public void instanceExists() throws Exception {
+        // Install an instance of Rya.
+        final String ryaInstanceName = getRyaInstanceName();
+        final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
+                getUsername(),
+                getPassword().toCharArray(),
+                getInstanceName(),
+                getZookeepers());
+
+        final InstallConfiguration installConfig = InstallConfiguration.builder()
+                .setEnableTableHashPrefix(false)
+                .setEnableEntityCentricIndex(false)
+                .setEnableFreeTextIndex(false)
+                .setEnableTemporalIndex(false)
+                .setEnablePcjIndex(false)
+                .setEnableGeoIndex(false)
+                .build();
+
+        final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector());
+        ryaClient.getInstall().install(ryaInstanceName, installConfig);
+
+        // Create the task that will be tested.
+        final AccumuloRyaSinkTask task = new AccumuloRyaSinkTask();
+
+        try {
+            // Configure the task to use the embedded accumulo instance for Rya.
+            final Map<String, String> config = new HashMap<>();
+            config.put(AccumuloRyaSinkConfig.ZOOKEEPERS, getZookeepers());
+            config.put(AccumuloRyaSinkConfig.CLUSTER_NAME, getInstanceName());
+            config.put(AccumuloRyaSinkConfig.USERNAME, getUsername());
+            config.put(AccumuloRyaSinkConfig.PASSWORD, getPassword());
+            config.put(AccumuloRyaSinkConfig.RYA_INSTANCE_NAME, ryaInstanceName);
+
+            // This will pass because the Rya instance exists.
+            task.start(config);
+
+        } finally {
+            task.stop();
+        }
+    }
+
+    @Test(expected = ConnectException.class)
+    public void instanceDoesNotExist() throws Exception {
+        // Create the task that will be tested.
+        final AccumuloRyaSinkTask task = new AccumuloRyaSinkTask();
+
+        try {
+            // Configure the task to use the embedded accumulo instance for Rya.
+            final Map<String, String> config = new HashMap<>();
+            config.put(AccumuloRyaSinkConfig.ZOOKEEPERS, getZookeepers());
+            config.put(AccumuloRyaSinkConfig.CLUSTER_NAME, getInstanceName());
+            config.put(AccumuloRyaSinkConfig.USERNAME, getUsername());
+            config.put(AccumuloRyaSinkConfig.PASSWORD, getPassword());
+            config.put(AccumuloRyaSinkConfig.RYA_INSTANCE_NAME, getRyaInstanceName());
+
+            // Staring the task will fail because the Rya instance does not exist.
+            task.start(config);
+
+        } finally {
+            task.stop();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/accumulo/README.md
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/accumulo/README.md b/extras/kafka.connect/accumulo/README.md
new file mode 100644
index 0000000..eecfd21
--- /dev/null
+++ b/extras/kafka.connect/accumulo/README.md
@@ -0,0 +1,23 @@
+<!-- Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License. -->
+
+This project is the Rya Kafka Connect Sink that writes to Accumulo backed 
+instances of Rya.
+
+This project produces a shaded jar that may be installed into Kafka Connect. 
+For more information about how to install and configure this connector, see
+[the manual](../../rya.manual/src/site/markdown/kafka-connect-integration.md).
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/accumulo/pom.xml
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/accumulo/pom.xml b/extras/kafka.connect/accumulo/pom.xml
new file mode 100644
index 0000000..54188db
--- /dev/null
+++ b/extras/kafka.connect/accumulo/pom.xml
@@ -0,0 +1,79 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" 
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    
+    <parent>
+        <groupId>org.apache.rya</groupId>
+        <artifactId>rya.kafka.connect.parent</artifactId>
+        <version>4.0.0-incubating-SNAPSHOT</version>
+    </parent>
+    
+    <artifactId>rya.kafka.connect.accumulo</artifactId>
+
+    <name>Apache Rya Kafka Connect - Accumulo</name>
+    <description>A Kafka Connect Sink that writes to a Rya instance backed by Accumulo.</description>
+    
+    <dependencies>
+        <!-- 1st party dependencies. -->
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.kafka.connect.api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.indexing</artifactId>
+        </dependency>
+        
+        <!-- 3rd party dependencies. -->
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>connect-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        
+        <!-- Testing dependencies. -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    
+    <build>
+        <plugins>
+            <!-- Build the uber jar that may be deployed to Kafka Connect. -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfig.java
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfig.java b/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfig.java
new file mode 100644
index 0000000..8db4f1c
--- /dev/null
+++ b/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfig.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka Connect configuration that is used to configure {@link AccumuloRyaSinkConnector}s
+ * and {@link AccumuloRyaSinkTask}s.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkConfig extends RyaSinkConfig {
+
+    public static final String ZOOKEEPERS = "accumulo.zookeepers";
+    private static final String ZOOKEEPERS_DOC = "A comma delimited list of the Zookeeper server hostname/port pairs.";
+
+    public static final String CLUSTER_NAME = "accumulo.cluster.name";
+    private static final String CLUSTER_NAME_DOC = "The name of the Accumulo instance within Zookeeper.";
+
+    public static final String USERNAME = "accumulo.username";
+    private static final String USERNAME_DOC = "The Accumulo username the Sail connections will use.";
+
+    public static final String PASSWORD = "accumulo.password";
+    private static final String PASSWORD_DOC = "The Accumulo password the Sail connections will use.";
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(ZOOKEEPERS, Type.STRING, Importance.HIGH, ZOOKEEPERS_DOC)
+            .define(CLUSTER_NAME, Type.STRING, Importance.HIGH, CLUSTER_NAME_DOC)
+            .define(USERNAME, Type.STRING, Importance.HIGH, USERNAME_DOC)
+            .define(PASSWORD, Type.PASSWORD, Importance.HIGH, PASSWORD_DOC);
+    static {
+        RyaSinkConfig.addCommonDefinitions(CONFIG_DEF);
+    }
+
+    /**
+     * Constructs an instance of {@link AccumuloRyaSinkConfig}.
+     *
+     * @param originals - The key/value pairs that define the configuration. (not null)
+     */
+    public AccumuloRyaSinkConfig(final Map<?, ?> originals) {
+        super(CONFIG_DEF, requireNonNull(originals));
+    }
+
+    /**
+     * @return A comma delimited list of the Zookeeper server hostname/port pairs.
+     */
+    public String getZookeepers() {
+        return super.getString(ZOOKEEPERS);
+    }
+
+    /**
+     * @return The name of the Accumulo instance within Zookeeper.
+     */
+    public String getClusterName() {
+        return super.getString(CLUSTER_NAME);
+    }
+
+    /**
+     * @return The Accumulo username the Sail connections will use.
+     */
+    public String getUsername() {
+        return super.getString(USERNAME);
+    }
+
+    /**
+     * @return The Accumulo password the Sail connections will use.
+     */
+    public String getPassword() {
+        return super.getPassword(PASSWORD).value();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java b/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java
new file mode 100644
index 0000000..eeb3d75
--- /dev/null
+++ b/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkConnector} that uses an Accumulo Rya backend when creating tasks.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkConnector extends RyaSinkConnector {
+
+    @Nullable
+    private AccumuloRyaSinkConfig config = null;
+
+    @Override
+    public void start(final Map<String, String> props) {
+        requireNonNull(props);
+        this.config = new AccumuloRyaSinkConfig( props );
+    }
+
+    @Override
+    protected AbstractConfig getConfig() {
+        if(config == null) {
+            throw new IllegalStateException("The configuration has not been set yet. Invoke start(Map) first.");
+        }
+        return config;
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return AccumuloRyaSinkTask.class;
+    }
+
+    @Override
+    public ConfigDef config() {
+        return AccumuloRyaSinkConfig.CONFIG_DEF;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java b/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java
new file mode 100644
index 0000000..7d19f29
--- /dev/null
+++ b/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.log.LogUtils;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+    @Override
+    protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException {
+        requireNonNull(taskConfig);
+
+        // Parse the configuration object.
+        final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
+
+        // Connect to the instance of Accumulo.
+        final Connector connector;
+        try {
+            final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+            connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() ));
+        } catch (final AccumuloException | AccumuloSecurityException e) {
+            throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e);
+        }
+
+        // Use a RyaClient to see if the configured instance exists.
+        try {
+            final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
+                    config.getUsername(),
+                    config.getPassword().toCharArray(),
+                    config.getClusterName(),
+                    config.getZookeepers());
+            final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector);
+
+            if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
+                throw new ConnectException("The Rya Instance named " +
+                        LogUtils.clean(config.getRyaInstanceName()) + " has not been installed.");
+            }
+
+        } catch (final RyaClientException e) {
+            throw new ConnectException("Unable to determine if the Rya Instance named " +
+                    LogUtils.clean(config.getRyaInstanceName()) + " has been installed.", e);
+        }
+    }
+
+    @Override
+    protected Sail makeSail(final Map<String, String> taskConfig) throws ConnectException {
+        requireNonNull(taskConfig);
+
+        // Parse the configuration object.
+        final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
+
+        // Move the configuration into a Rya Configuration object.
+        final AccumuloRdfConfiguration ryaConfig = new AccumuloRdfConfiguration();
+        ryaConfig.setTablePrefix( config.getRyaInstanceName() );
+        ryaConfig.setAccumuloZookeepers( config.getZookeepers() );
+        ryaConfig.setAccumuloInstance( config.getClusterName() );
+        ryaConfig.setAccumuloUser( config.getUsername() );
+        ryaConfig.setAccumuloPassword( config.getPassword() );
+
+        // Create the Sail object.
+        try {
+            return RyaSailFactory.getInstance(ryaConfig);
+        } catch (SailException | AccumuloException | AccumuloSecurityException | RyaDAOException | InferenceEngineException e) {
+            throw new ConnectException("Could not connect to the Rya Instance named " + config.getRyaInstanceName(), e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/accumulo/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfigTest.java
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/accumulo/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfigTest.java b/extras/kafka.connect/accumulo/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfigTest.java
new file mode 100644
index 0000000..66ecd87
--- /dev/null
+++ b/extras/kafka.connect/accumulo/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfigTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig;
+import org.junit.Test;
+
+/**
+ * Unit tests the methods of {@link AccumuloRyaSinkConfig}.
+ */
+public class AccumuloRyaSinkConfigTest {
+
+    @Test
+    public void parses() {
+        final Map<String, String> properties = new HashMap<>();
+        properties.put(AccumuloRyaSinkConfig.ZOOKEEPERS, "zoo1:2181,zoo2");
+        properties.put(AccumuloRyaSinkConfig.CLUSTER_NAME, "test");
+        properties.put(AccumuloRyaSinkConfig.USERNAME, "alice");
+        properties.put(AccumuloRyaSinkConfig.PASSWORD, "alice1234!@");
+        properties.put(RyaSinkConfig.RYA_INSTANCE_NAME, "rya_");
+        new AccumuloRyaSinkConfig(properties);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/api/README.md
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/api/README.md b/extras/kafka.connect/api/README.md
new file mode 100644
index 0000000..777fd2a
--- /dev/null
+++ b/extras/kafka.connect/api/README.md
@@ -0,0 +1,20 @@
+<!-- Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License. -->
+
+This project contains the common components of a Rya Kafka Connect Sink. Each
+backend database that Rya is built on top of must have an implementation using
+this project's components.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/api/pom.xml
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/api/pom.xml b/extras/kafka.connect/api/pom.xml
new file mode 100644
index 0000000..3727394
--- /dev/null
+++ b/extras/kafka.connect/api/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" 
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.rya</groupId>
+        <artifactId>rya.kafka.connect.parent</artifactId>
+        <version>4.0.0-incubating-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>rya.kafka.connect.api</artifactId>
+
+    <name>Apache Rya Kafka Connect - API</name>
+    <description>Contains common components used when implementing a Kafka Connect Sink
+                 that writes to a Rya instance.</description>
+
+    <dependencies>
+        <!-- 1st party dependencies. -->
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.api.model</artifactId>
+        </dependency>
+    
+        <!-- 3rd party dependencies. -->
+        <dependency>
+            <groupId>org.eclipse.rdf4j</groupId>
+            <artifactId>rdf4j-rio-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.rdf4j</groupId>
+            <artifactId>rdf4j-rio-binary</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.rdf4j</groupId>
+            <artifactId>rdf4j-rio-datatypes</artifactId>
+        </dependency>        
+        <dependency>
+            <groupId>com.github.stephenc.findbugs</groupId>
+            <artifactId>findbugs-annotations</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.jcabi</groupId>
+            <artifactId>jcabi-manifests</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>connect-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.rdf4j</groupId>
+            <artifactId>rdf4j-runtime</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        
+        <!-- Testing dependencies. -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsConverter.java
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsConverter.java b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsConverter.java
new file mode 100644
index 0000000..eb4b611
--- /dev/null
+++ b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsConverter.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.storage.Converter;
+import org.eclipse.rdf4j.model.Statement;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A plugin into the Kafka Connect platform that converts {@link Set}s of {@link Statement}s
+ * to/from byte[]s by using a {@link StatementsSerializer} and a {@link StatementsDeserializer}.
+ * <p/>
+ * This converter does not use Kafka's Schema Registry.
+ */
+@DefaultAnnotation(NonNull.class)
+public class StatementsConverter implements Converter {
+
+    private static final StatementsSerializer SERIALIZER = new StatementsSerializer();
+    private static final StatementsDeserializer DESERIALIZER = new StatementsDeserializer();
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        // This converter's behavior can not be tuned with configurations.
+    }
+
+    @Override
+    public byte[] fromConnectData(final String topic, final Schema schema, final Object value) {
+        requireNonNull(value);
+        return SERIALIZER.serialize(topic, (Set<Statement>) value);
+    }
+
+    @Override
+    public SchemaAndValue toConnectData(final String topic, final byte[] value) {
+        requireNonNull(value);
+        return new SchemaAndValue(null, DESERIALIZER.deserialize(topic, value));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsDeserializer.java
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsDeserializer.java b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsDeserializer.java
new file mode 100644
index 0000000..fb03347
--- /dev/null
+++ b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsDeserializer.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFHandlerException;
+import org.eclipse.rdf4j.rio.RDFParseException;
+import org.eclipse.rdf4j.rio.RDFParser;
+import org.eclipse.rdf4j.rio.binary.BinaryRDFParserFactory;
+import org.eclipse.rdf4j.rio.helpers.AbstractRDFHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka {@link Deserializer} that is able to deserialize an RDF4J Rio Binary format serialized
+ * set of {@link Statement}s.
+ */
+@DefaultAnnotation(NonNull.class)
+public class StatementsDeserializer implements Deserializer<Set<Statement>> {
+    private static final Logger log = LoggerFactory.getLogger(StatementsDeserializer.class);
+
+    private static final BinaryRDFParserFactory PARSER_FACTORY = new BinaryRDFParserFactory();
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        // Nothing to do.
+    }
+
+    @Override
+    public Set<Statement> deserialize(final String topic, final byte[] data) {
+        if(data == null || data.length == 0) {
+            // Return null because that is the contract of this method.
+            return null;
+        }
+
+        try {
+            final RDFParser parser = PARSER_FACTORY.getParser();
+            final Set<Statement> statements = new HashSet<>();
+
+            parser.setRDFHandler(new AbstractRDFHandler() {
+                @Override
+                public void handleStatement(final Statement statement) throws RDFHandlerException {
+                    log.debug("Statement: " + statement);
+                    statements.add( statement );
+                }
+            });
+
+            parser.parse(new ByteArrayInputStream(data), null);
+            return statements;
+
+        } catch(final RDFParseException | RDFHandlerException | IOException e) {
+            log.error("Could not deserialize a Set of VisibilityStatement objects using the RDF4J Rio Binary format.", e);
+            return null;
+        }
+    }
+
+    @Override
+    public void close() {
+        // Nothing to do.
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerde.java
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerde.java b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerde.java
new file mode 100644
index 0000000..f2101d6
--- /dev/null
+++ b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerde.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.eclipse.rdf4j.model.Statement;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Provides a {@link Serializer} and {@link Deserializer} for {@link Statement}s.
+ */
+@DefaultAnnotation(NonNull.class)
+public class StatementsSerde implements Serde<Set<Statement>> {
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        // Nothing to do.
+    }
+
+    @Override
+    public Serializer<Set<Statement>> serializer() {
+        return new StatementsSerializer();
+    }
+
+    @Override
+    public Deserializer<Set<Statement>> deserializer() {
+        return new StatementsDeserializer();
+    }
+
+    @Override
+    public void close() {
+        // Nothing to do.
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java
new file mode 100644
index 0000000..893df0c
--- /dev/null
+++ b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFWriter;
+import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka {@link Serializer} that is able to serialize a set of {@link Statement}s
+ * using the RDF4J Rio Binary format.
+ */
+@DefaultAnnotation(NonNull.class)
+public class StatementsSerializer implements Serializer<Set<Statement>> {
+    private static final Logger log = LoggerFactory.getLogger(StatementsSerializer.class);
+
+    private static final BinaryRDFWriterFactory WRITER_FACTORY = new BinaryRDFWriterFactory();
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        // Nothing to do.
+    }
+
+    @Override
+    public byte[] serialize(final String topic, final Set<Statement> data) {
+        if(data == null) {
+            // Returning null because that is the contract of this method.
+            return null;
+        }
+
+        // Write the statements using a Binary RDF Writer.
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        final RDFWriter writer = WRITER_FACTORY.getWriter(baos);
+        writer.startRDF();
+
+        for(final Statement stmt : data) {
+            // Write the statement.
+            log.debug("Writing Statement: " + stmt);
+            writer.handleStatement(stmt);
+        }
+        writer.endRDF();
+
+        // Return the byte[] version of the data.
+        return baos.toByteArray();
+    }
+
+    @Override
+    public void close() {
+        // Nothing to do.
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConfig.java
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConfig.java b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConfig.java
new file mode 100644
index 0000000..5c3e2cc
--- /dev/null
+++ b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConfig.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Contains common configuration fields for a Rya Sinks.
+ */
+@DefaultAnnotation(NonNull.class)
+public class RyaSinkConfig extends AbstractConfig {
+
+    public static final String RYA_INSTANCE_NAME = "rya.instance.name";
+    private static final String RYA_INSTANCE_NAME_DOC = "The name of the RYA instance that will be connected to.";
+
+    /**
+     * @param configDef - The configuration schema definition that will be updated to include
+     *   this configuration's fields. (not null)
+     */
+    public static void addCommonDefinitions(final ConfigDef configDef) {
+        requireNonNull(configDef);
+        configDef.define(RYA_INSTANCE_NAME, Type.STRING, Importance.HIGH, RYA_INSTANCE_NAME_DOC);
+    }
+
+    /**
+     * Constructs an instance of {@link RyaSinkConfig}.
+     *
+     * @param definition - Defines the schema of the configuration. (not null)
+     * @param originals - The key/value pairs that define the configuration. (not null)
+     */
+    public RyaSinkConfig(final ConfigDef definition, final Map<?, ?> originals) {
+        super(definition, originals);
+    }
+
+    /**
+     * @return The name of the RYA instance that will be connected to.
+     */
+    public String getRyaInstanceName() {
+        return super.getString(RYA_INSTANCE_NAME);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConnector.java
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConnector.java b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConnector.java
new file mode 100644
index 0000000..f288af2
--- /dev/null
+++ b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConnector.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.connect.sink.SinkConnector;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Handles the common components required to task {@link RyaSinkTask}s that write to Rya.
+ * </p>
+ * Implementations of this class only need to specify functionality that is specific to the Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
+public abstract class RyaSinkConnector extends SinkConnector {
+
+    /**
+     * Get the configuration that will be provided to the tasks when {@link #taskConfigs(int)} is invoked.
+     * </p>
+     * Only called after start has been invoked
+     *
+     * @return The configuration object for the connector.
+     * @throws IllegalStateException Thrown if {@link SinkConnector#start(Map)} has not been invoked yet.
+     */
+    protected abstract AbstractConfig getConfig() throws IllegalStateException;
+
+    @Override
+    public String version() {
+        return Manifests.exists("Build-Version") ? Manifests.read("Build-Version") : "UNKNOWN";
+    }
+
+    @Override
+    public List<Map<String, String>> taskConfigs(final int maxTasks) {
+        final List<Map<String, String>> configs = new ArrayList<>(maxTasks);
+        for(int i = 0; i < maxTasks; i++) {
+            configs.add( getConfig().originalsStrings() );
+        }
+        return configs;
+    }
+
+    @Override
+    public void stop() {
+        // Nothing to do since the RyaSinkConnector has no background monitoring.
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java
new file mode 100644
index 0000000..5ff118a
--- /dev/null
+++ b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.repository.sail.SailRepository;
+import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
+import org.eclipse.rdf4j.sail.Sail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Handles the common components required to write {@link Statement}s to Rya.
+ * <p/>
+ * Implementations of this class only need to specify functionality that is specific to the
+ * Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
+public abstract class RyaSinkTask extends SinkTask {
+    private static final Logger log = LoggerFactory.getLogger(RyaSinkTask.class);
+
+    @Nullable
+    private SailRepository sailRepo = null;
+
+    @Nullable
+    private SailRepositoryConnection conn = null;
+
+    /**
+     * Throws an exception if the configured Rya Instance is not already installed
+     * within the configured database.
+     *
+     * @param taskConfig - The configuration values that were provided to the task. (not null)
+     * @throws ConnectException The configured Rya Instance is not installed to the configured database
+     *   or we were unable to figure out if it is installed.
+     */
+    protected abstract void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException;
+
+    /**
+     * Creates an initialized {@link Sail} object that may be used to write {@link Statement}s to the configured
+     * Rya Instance.
+     *
+     * @param taskConfig - Configures how the Sail object will be created. (not null)
+     * @return The created Sail object.
+     * @throws ConnectException The Sail object could not be made.
+     */
+    protected abstract Sail makeSail(final Map<String, String> taskConfig) throws ConnectException;
+
+    @Override
+    public String version() {
+        return Manifests.exists("Build-Version") ? Manifests.read("Build-Version"): "UNKNOWN";
+    }
+
+    @Override
+    public void start(final Map<String, String> props) throws ConnectException {
+        requireNonNull(props);
+
+        // Ensure the configured Rya Instance is installed within the configured database.
+        checkRyaInstanceExists(props);
+
+        // Create the Sail object that is connected to the Rya Instance.
+        final Sail sail = makeSail(props);
+        sailRepo = new SailRepository( sail );
+        conn = sailRepo.getConnection();
+    }
+
+    @Override
+    public void put(final Collection<SinkRecord> records) {
+        requireNonNull(records);
+
+        // Return immediately if there are no records to handle.
+        if(records.isEmpty()) {
+            return;
+        }
+
+        // If a transaction has not been started yet, then start one.
+        if(!conn.isActive()) {
+            conn.begin();
+        }
+
+        // Iterate through the records and write them to the Sail object.
+        for(final SinkRecord record : records) {
+            // If everything has been configured correctly, then the record's value will be a Set<Statement>.
+            conn.add((Set<? extends Statement>) record.value());
+        }
+    }
+
+    @Override
+    public void flush(final Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
+        requireNonNull(currentOffsets);
+        // Flush the current transaction.
+        conn.commit();
+    }
+
+    @Override
+    public void stop() {
+        try {
+            if(conn != null) {
+                conn.close();
+            }
+        } catch(final Exception e) {
+            log.error("Could not close the Sail Repository Connection.", e);
+        }
+
+        try {
+            if(sailRepo != null) {
+                sailRepo.shutDown();
+            }
+        } catch(final Exception e) {
+            log.error("Could not shut down the Sail Repository.", e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af736749/extras/kafka.connect/api/src/test/java/org/apache/rya/kafka/connect/api/StatementsSerdeTest.java
----------------------------------------------------------------------
diff --git a/extras/kafka.connect/api/src/test/java/org/apache/rya/kafka/connect/api/StatementsSerdeTest.java b/extras/kafka.connect/api/src/test/java/org/apache/rya/kafka/connect/api/StatementsSerdeTest.java
new file mode 100644
index 0000000..01e5b76
--- /dev/null
+++ b/extras/kafka.connect/api/src/test/java/org/apache/rya/kafka/connect/api/StatementsSerdeTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.util.Set;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.model.ValueFactory;
+import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Unit tests the methods of {@link StatementsSerde}.
+ */
+public class StatementsSerdeTest {
+
+    @Test
+    public void serializeAndDeserialize() {
+        // Create the object that will be serialized.
+        final ValueFactory vf = SimpleValueFactory.getInstance();
+
+        final Set<Statement> original = Sets.newHashSet(
+                vf.createStatement(
+                        vf.createIRI("urn:alice"),
+                        vf.createIRI("urn:talksTo"),
+                        vf.createIRI("urn:bob"),
+                        vf.createIRI("urn:testGraph")),
+                vf.createStatement(
+                        vf.createIRI("urn:bob"),
+                        vf.createIRI("urn:talksTo"),
+                        vf.createIRI("urn:charlie"),
+                        vf.createIRI("urn:graph2")),
+                vf.createStatement(
+                        vf.createIRI("urn:charlie"),
+                        vf.createIRI("urn:talksTo"),
+                        vf.createIRI("urn:bob"),
+                        vf.createIRI("urn:graph2")),
+                vf.createStatement(
+                        vf.createIRI("urn:alice"),
+                        vf.createIRI("urn:listensTo"),
+                        vf.createIRI("urn:charlie"),
+                        vf.createIRI("urn:testGraph")));
+
+        // Serialize it.
+        try(final Serde<Set<Statement>> serde = new StatementsSerde()) {
+            final byte[] bytes = serde.serializer().serialize("topic", original);
+
+            // Deserialize it.
+            final Set<Statement> deserialized = serde.deserializer().deserialize("topic", bytes);
+
+            // Show the deserialized value matches the original.
+            assertEquals(original, deserialized);
+        }
+    }
+
+    @Test
+    public void deserializeEmptyData() {
+        try(final Serde<Set<Statement>> serde = new StatementsSerde()) {
+            assertNull( serde.deserializer().deserialize("topic", new byte[0]) );
+        }
+    }
+}
\ No newline at end of file