You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@rya.apache.org by kchilton2 <gi...@git.apache.org> on 2018/05/10 16:59:08 UTC
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
GitHub user kchilton2 opened a pull request:
https://github.com/apache/incubator-rya/pull/296
RYA-487 Kafka Connect Sinks
Ignore RYA-487. That's a separate PR that this depended on to create documentation for the manual.
This PR implements Kafka Connect Sinks for both Accumulo and MongoDB backed Rya. I wrote a manual page that explains design decisions that were made, notes about deploying the sinks, a quick start, and future work we may want to consider.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/kchilton2/incubator-rya RYA-487
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/incubator-rya/pull/296.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #296
----
commit ed1def04283a2e893a1c45014628dc7fe10717eb
Author: kchilton2 <ke...@...>
Date: 2018-05-08T22:33:24Z
RYA-494 Fixed a bug where the shell was not loading or displaying all Statements.
commit c2d802bd51d4d6951aa54e48b4038d64462bd8fd
Author: kchilton2 <ke...@...>
Date: 2018-04-17T19:10:26Z
RYA-487 Implement Kafka Connect Sink implementations for Accumulo and Mongo DB backed Rya.
----
---
[GitHub] incubator-rya issue #296: RYA-487 Kafka Connect Sinks
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:
https://github.com/apache/incubator-rya/pull/296
Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/758/
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187702698
--- Diff: extras/kafka.connect/accumulo/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfigTest.java ---
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig;
+import org.junit.Test;
+
+/**
+ * Unit tests the methods of {@link AccumuloRyaSinkConfig}.
+ */
+public class AccumuloRyaSinkConfigTest {
+
+ @Test
+ public void parses() {
--- End diff --
should there be other tests here confirming when malformed fields are added?
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187708536
--- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java ---
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.repository.sail.SailRepository;
+import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
+import org.eclipse.rdf4j.sail.Sail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Handles the common components required to write {@link Statement}s to Rya.
+ * <p/>
+ * Implementations of this class only need to specify functionality that is specific to the
+ * Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
--- End diff --
maybe I'm not clear what this annotation does, but again, all the fields are declared Nullable
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187730865
--- Diff: extras/kafka.connect/client/pom.xml ---
@@ -0,0 +1,135 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.kafka.connect.parent</artifactId>
+ <version>4.0.0-incubating-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>rya.kafka.connect.client</artifactId>
+
+ <name>Apache Rya Kafka Connect - Client</name>
+ <description>Contains a client that may be used to load Statements into
+ a Kafka topic to be read by Kafka Connect.</description>
+
+ <dependencies>
+ <!-- 1st party dependencies. -->
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.sail</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.kafka.connect.api</artifactId>
+ </dependency>
+
+ <!-- 3rd party dependencies. -->
+ <dependency>
+ <groupId>org.eclipse.rdf4j</groupId>
+ <artifactId>rdf4j-model</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.beust</groupId>
+ <artifactId>jcommander</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.github.stephenc.findbugs</groupId>
+ <artifactId>findbugs-annotations</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
+
+ <!-- Statement formats we support for loading. -->
--- End diff --
It does, good call. Done.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187636622
--- Diff: extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/CLIDriver.java ---
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.client;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand.ArgumentsException;
+import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand.ExecutionException;
+import org.apache.rya.kafka.connect.client.command.ReadStatementsCommand;
+import org.apache.rya.kafka.connect.client.command.WriteStatementsCommand;
+import org.eclipse.rdf4j.model.Statement;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A CLI tool used to read/write {@link Statement}s to/from a Kafka topic using the format
+ * the Rya Kafka Connect Sinks expect.
+ */
+@DefaultAnnotation(NonNull.class)
+public class CLIDriver {
+
+ /**
+ * Maps from command strings to the object that performs the command.
+ */
+ private static final ImmutableMap<String, RyaKafkaClientCommand> COMMANDS;
+ static {
+ final Set<Class<? extends RyaKafkaClientCommand>> commandClasses = new HashSet<>();
+ commandClasses.add(ReadStatementsCommand.class);
+ commandClasses.add(WriteStatementsCommand.class);
+ final ImmutableMap.Builder<String, RyaKafkaClientCommand> builder = ImmutableMap.builder();
+ for(final Class<? extends RyaKafkaClientCommand> commandClass : commandClasses) {
+ try {
+ final RyaKafkaClientCommand command = commandClass.newInstance();
+ builder.put(command.getCommand(), command);
+ } catch (InstantiationException | IllegalAccessException e) {
+ System.err.println("Could not run the application because a RyaKafkaClientCommand is missing its empty constructor.");
+ e.printStackTrace();
+ }
+ }
+ COMMANDS = builder.build();
+ }
+
+ private static final String USAGE = makeUsage(COMMANDS);
+
+ public static void main(final String[] args) {
+ // If no command provided or the command isn't recognized, then print the usage.
+ if (args.length == 0 || !COMMANDS.containsKey(args[0])) {
+ System.out.println(USAGE);
+ System.exit(1);
+ }
+
+ // Fetch the command that will be executed.
+ final String command = args[0];
+ final String[] commandArgs = Arrays.copyOfRange(args, 1, args.length);
+ final RyaKafkaClientCommand clientCommand = COMMANDS.get(command);
+
+ // Print usage if the arguments are invalid for the command.
+ if(!clientCommand.validArguments(commandArgs)) {
+ System.out.println(clientCommand.getUsage());
+ System.exit(1);
+ }
+
+ // Execute the command.
+ try {
--- End diff --
Spacing is off
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187736288
--- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConnector.java ---
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkConnector} that uses a Mongo DB Rya backend when creating tasks.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoRyaSinkConnector extends RyaSinkConnector {
+
+ @Nullable
+ private MongoRyaSinkConfig config = null;
+
+ @Override
+ public void start(final Map<String, String> props) {
+ this.config = new MongoRyaSinkConfig( props );
+ }
+
+ @Override
+ protected AbstractConfig getConfig() {
+ if(config == null) {
+ throw new IllegalStateException("The configuration has not been set yet. Invoke start(props) first.");
--- End diff --
Done.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187733216
--- Diff: extras/kafka.connect/README.md ---
@@ -0,0 +1,22 @@
+<!-- Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. -->
+
+The parent project for all Rya Kafka Connect work. All projects thare are part
+of that system must use this project's pom as their parent pom.
--- End diff --
Done.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/incubator-rya/pull/296
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187739885
--- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java ---
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.repository.sail.SailRepository;
+import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
+import org.eclipse.rdf4j.sail.Sail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Handles the common components required to write {@link Statement}s to Rya.
+ * <p/>
+ * Implementations of this class only need to specify functionality that is specific to the
+ * Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
--- End diff --
gotcha, thanks for the doc paste dump
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187698179
--- Diff: dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java ---
@@ -62,14 +62,14 @@ public AccumuloRdfConfiguration() {
super();
}
- public AccumuloRdfConfiguration(Configuration other) {
+ public AccumuloRdfConfiguration(final Configuration other) {
super(other);
}
- public AccumuloRdfConfigurationBuilder getBuilder() {
+ public static AccumuloRdfConfigurationBuilder getBuilder() {
--- End diff --
isn't the convention usually builder() for static builder functions?
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187634162
--- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConnector.java ---
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.connect.sink.SinkConnector;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Handles the common components required to task {@link RyaSinkTask}s that write to Rya.
+ * </p>
+ * Implementations of this class only need to specify functionality that is specific to the Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
+public abstract class RyaSinkConnector extends SinkConnector {
+
+ /**
+ * Get the configuration that will be provided to the tasks when {@link #taskConfigs(int)} is invoked.
+ * </p>
+ * Only called after start has been invoked
+ *
+ * @return The configuration object for the connector.
+ * @throws IllegalStateException Thrown if {@link SinkConnector#start(Map)} has not been invoked yet.
+ */
+ protected abstract AbstractConfig getConfig() throws IllegalStateException;
+
+ @Override
+ public String version() {
+ return Manifests.exists("Build-Version") ? Manifests.read("Build-Version") : "UNKNOWN";
+ }
+
+ @Override
+ public List<Map<String, String>> taskConfigs(final int maxTasks) {
+ final List<Map<String, String>> configs = new ArrayList<>(maxTasks);
+ for(int i = 0; i < maxTasks; i++) {
+ configs.add( getConfig().originalsStrings() );
+ }
+ return configs;
+ }
+
+ @Override
+ public void stop() {
+ // Nothing to do since the RyaSinkconnector has no background monitoring.
--- End diff --
typo. RyaSinkConnector
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187702555
--- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+ @Override
+ protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException {
+ requireNonNull(taskConfig);
+
+ // Parse the configuration object.
+ final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
+
+ // Connect to the instance of Accumulo.
+ final Connector connector;
+ try {
+ final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+ connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() ));
+ } catch (AccumuloException | AccumuloSecurityException e) {
+ throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e);
+ }
+
+ // Use a RyaClient to see if the configured instance exists.
+ try {
+ final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
+ config.getUsername(),
+ config.getPassword().toCharArray(),
+ config.getClusterName(),
+ config.getZookeepers());
+ final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector);
+
+ if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
+ throw new ConnectException("The Rya Instance named " +
+ config.getRyaInstanceName() + " has not been installed.");
+ }
+
+ } catch (final RyaClientException e) {
+ throw new ConnectException("Unable to determine if the Rya Instance named " +
+ config.getRyaInstanceName() + " has been installed.", e);
+ }
+ }
+
+ @Override
+ protected Sail makeSail(final Map<String, String> taskConfig) throws ConnectException {
+ requireNonNull(taskConfig);
+
+ // Parse the configuration object.
+ final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
+
+ // Move the configuration into a Rya Configuration object.
+ final AccumuloRdfConfiguration ryaConfig = new AccumuloRdfConfiguration();
+ ryaConfig.setTablePrefix( config.getRyaInstanceName() );
+ ryaConfig.setAccumuloZookeepers( config.getZookeepers() );
+ ryaConfig.setAccumuloInstance( config.getClusterName() );
+ ryaConfig.setAccumuloUser( config.getUsername() );
+ ryaConfig.setAccumuloPassword( config.getPassword() );
+
+ // Create the Sail object.
+ try {
+ return RyaSailFactory.getInstance(ryaConfig);
--- End diff --
yeah, when we started the Rya Streams project Geo was considered a non-requirement.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187629343
--- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+ @Override
+ protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException {
+ requireNonNull(taskConfig);
+
+ // Parse the configuration object.
+ final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
+
+ // Connect to the instance of Accumulo.
+ final Connector connector;
+ try {
+ final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+ connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() ));
+ } catch (AccumuloException | AccumuloSecurityException e) {
+ throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e);
+ }
+
+ // Use a RyaClient to see if the configured instance exists.
+ try {
+ final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
+ config.getUsername(),
+ config.getPassword().toCharArray(),
+ config.getClusterName(),
+ config.getZookeepers());
+ final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector);
+
+ if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
+ throw new ConnectException("The Rya Instance named " +
+ config.getRyaInstanceName() + " has not been installed.");
--- End diff --
Use LogUtils.
LogUtils.clean(config.getRyaInstanceName())
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187633086
--- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java ---
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFWriter;
+import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka {@link Serializer} that is able to serialize a set of {@link Statement}s
+ * using the RDF4J Rio Binary format.
+ */
+@DefaultAnnotation(NonNull.class)
+public class StatementsSerializer implements Serializer<Set<Statement>> {
+ private static final Logger log = LoggerFactory.getLogger(StatementsSerializer.class);
+
+ private static final BinaryRDFWriterFactory WRITER_FACTORY = new BinaryRDFWriterFactory();
+
+ @Override
+ public void configure(final Map<String, ?> configs, final boolean isKey) {
+ // Nothing to do.
+ }
+
+ @Override
+ public byte[] serialize(final String topic, final Set<Statement> data) {
+ if(data == null) {
+ // Returning null because that is the contract of this method.
+ return null;
+ }
+
+ // Write the statements using a Binary RDF Writer.
+ final ByteArrayOutputStream boas = new ByteArrayOutputStream();
--- End diff --
change variable name to baos
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187709021
--- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java ---
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.repository.sail.SailRepository;
+import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
+import org.eclipse.rdf4j.sail.Sail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Handles the common components required to write {@link Statement}s to Rya.
+ * <p/>
+ * Implementations of this class only need to specify functionality that is specific to the
+ * Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
+public abstract class RyaSinkTask extends SinkTask {
+ private static final Logger log = LoggerFactory.getLogger(RyaSinkTask.class);
+
+ @Nullable
+ private SailRepository sailRepo = null;
+
+ @Nullable
+ private SailRepositoryConnection conn = null;
+
+ /**
+ * Throws an exception if the configured Rya Instance is not already installed
+ * within the configured database.
+ *
+ * @param taskConfig - The configuration values that were provided to the task. (not null)
+ * @throws ConnectException The configured Rya Instance is not installed to the configured database
+ * or we were unable to figure out if it is installed.
+ */
+ protected abstract void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException;
+
+ /**
+ * Creates an initialized {@link Sail} object that may be used to write {@link Statement}s to the configured
+ * Rya Instance.
+ *
+ * @param taskConfig - Configures how the Sail object will be created. (not null)
+ * @return The created Sail object.
+ * @throws ConnectException The Sail object could not be made.
+ */
+ protected abstract Sail makeSail(final Map<String, String> taskConfig) throws ConnectException;
+
+ @Override
+ public String version() {
+ return Manifests.exists("Build-Version") ? Manifests.read("Build-Version"): "UNKNOWN";
+ }
+
+ @Override
+ public void start(final Map<String, String> props) throws ConnectException {
+ requireNonNull(props);
+
+ // Ensure the configured Rya Instance is installed within the configured database.
+ checkRyaInstanceExists(props);
+
+ // Create the Sail object that is connected to the Rya Instance.
+ final Sail sail = makeSail(props);
+ sailRepo = new SailRepository( sail );
+ conn = sailRepo.getConnection();
+ }
+
+ @Override
+ public void put(final Collection<SinkRecord> records) {
+ requireNonNull(records);
+
+ // Return immediately if there are no records to handle.
+ if(records.isEmpty()) {
+ return;
+ }
+
+ // If a transaction has not been started yet, then start one.
+ if(!conn.isActive()) {
+ conn.begin();
+ }
+
+ // Iterate through the records and write them to the Sail object.
+ for(final SinkRecord record : records) {
+ // If everything has been configured correctly, then the record's value will be a Set<Statement>.
+ conn.add((Set<? extends Statement>) record.value());
+ }
+ }
+
+ @Override
+ public void flush(final Map<TopicPartition, OffsetAndMetadata> currentOffets) {
+ // Flush the current transaction.
+ conn.commit();
+ }
+
+ @Override
+ public void stop() {
+ try {
--- End diff --
what about the actual Sail that is used? are you expecting that to be shutdown by whatever uses this?
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187734692
--- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java ---
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkConnector} that uses an Accumulo Rya backend when creating tasks.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkConnector extends RyaSinkConnector {
+
+ @Nullable
+ private AccumuloRyaSinkConfig config = null;
+
+ @Override
+ public void start(final Map<String, String> props) {
+ this.config = new AccumuloRyaSinkConfig( props );
+ }
+
+ @Override
+ protected AbstractConfig getConfig() {
+ if(config == null) {
+ throw new IllegalStateException("The configuration has not been set yet. Invoke start(props) first.");
--- End diff --
Done.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187638405
--- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java ---
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.mongo.MongoConnectionDetails;
+import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import com.google.common.base.Strings;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoCredential;
+import com.mongodb.ServerAddress;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoRyaSinkTask extends RyaSinkTask {
+
+ @Override
+ protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws IllegalStateException {
+ requireNonNull(taskConfig);
+
+ // Parse the configuration object.
+ final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig);
+ @Nullable
+ final String username = Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername();
+ @Nullable
+ final char[] password = Strings.isNullOrEmpty(config.getPassword()) ? null : config.getPassword().toCharArray();
+
+ // Connect a Mongo Client to the configured Mongo DB instance.
+ final ServerAddress serverAddr = new ServerAddress(config.getHostname(), config.getPort());
+ final boolean hasCredentials = username != null && password != null;
+
+ try(MongoClient mongoClient = hasCredentials ?
+ new MongoClient(serverAddr, Arrays.asList(MongoCredential.createCredential(username, config.getRyaInstanceName(), password))) :
+ new MongoClient(serverAddr)) {
+ // Use a RyaClient to see if the configured instance exists.
+ // Create the Mongo Connection Details that describe the Mongo DB Server we are interacting with.
+ final MongoConnectionDetails connectionDetails = new MongoConnectionDetails(
+ config.getHostname(),
+ config.getPort(),
+ Optional.ofNullable(username),
+ Optional.ofNullable(password));
+
+ final RyaClient client = MongoRyaClientFactory.build(connectionDetails, mongoClient);
+ if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
+ throw new ConnectException("The Rya Instance named " +
+ config.getRyaInstanceName() + " has not been installed.");
+ }
+ } catch(final RyaClientException e) {
+ throw new ConnectException("Unable to determine if the Rya Instance named " +
+ config.getRyaInstanceName() + " has been installed.", e);
+ }
+ }
+
+ @Override
+ protected Sail makeSail(final Map<String, String> taskConfig) {
+ requireNonNull(taskConfig);
+
+ // Parse the configuration object.
+ final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig);
+
+ // Move the configuration into a Rya Configuration object.
+ final MongoDBRdfConfiguration ryaConfig = new MongoDBRdfConfiguration();
+ ConfigUtils.setUseMongo(ryaConfig, true);
+ ryaConfig.setMongoDBName( config.getRyaInstanceName() );
+ ryaConfig.setTablePrefix( config.getRyaInstanceName() );
+ ryaConfig.setMongoHostname( config.getHostname() );
+ ryaConfig.setMongoPort( "" + config.getPort() );
+
+ if(!Strings.isNullOrEmpty(config.getUsername()) && !Strings.isNullOrEmpty(config.getPassword())) {
+ ryaConfig.setMongoUser( config.getUsername() );
+ ryaConfig.setMongoPassword( config.getPassword() );
+ }
+
+ // Create the Sail object.
+ try {
+ return RyaSailFactory.getInstance(ryaConfig);
--- End diff --
Add geo support
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187667235
--- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+ @Override
+ protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException {
+ requireNonNull(taskConfig);
+
+ // Parse the configuration object.
+ final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
+
+ // Connect to the instance of Accumulo.
+ final Connector connector;
+ try {
+ final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+ connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() ));
+ } catch (AccumuloException | AccumuloSecurityException e) {
+ throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e);
+ }
+
+ // Use a RyaClient to see if the configured instance exists.
+ try {
+ final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
+ config.getUsername(),
+ config.getPassword().toCharArray(),
+ config.getClusterName(),
+ config.getZookeepers());
+ final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector);
+
+ if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
+ throw new ConnectException("The Rya Instance named " +
+ config.getRyaInstanceName() + " has not been installed.");
+ }
+
+ } catch (final RyaClientException e) {
+ throw new ConnectException("Unable to determine if the Rya Instance named " +
+ config.getRyaInstanceName() + " has been installed.", e);
+ }
+ }
+
+ @Override
+ protected Sail makeSail(final Map<String, String> taskConfig) throws ConnectException {
+ requireNonNull(taskConfig);
+
+ // Parse the configuration object.
+ final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
+
+ // Move the configuration into a Rya Configuration object.
+ final AccumuloRdfConfiguration ryaConfig = new AccumuloRdfConfiguration();
+ ryaConfig.setTablePrefix( config.getRyaInstanceName() );
+ ryaConfig.setAccumuloZookeepers( config.getZookeepers() );
+ ryaConfig.setAccumuloInstance( config.getClusterName() );
+ ryaConfig.setAccumuloUser( config.getUsername() );
+ ryaConfig.setAccumuloPassword( config.getPassword() );
+
+ // Create the Sail object.
+ try {
+ return RyaSailFactory.getInstance(ryaConfig);
--- End diff --
I guess adding geo here can be an improvement for someday when we use plugins and don't need to specifically reference geo code. Leave as RyaSailFactory for now.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187706947
--- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java ---
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFWriter;
+import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka {@link Serializer} that is able to serialize a set of {@link Statement}s
+ * using the RDF4J Rio Binary format.
--- End diff --
i feel like it might be worthwhile to mention that you use the RDFParser and RDFWriter here and the Deserializer respectively.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187734887
--- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java ---
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.repository.sail.SailRepository;
+import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
+import org.eclipse.rdf4j.sail.Sail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Handles the common components required to write {@link Statement}s to Rya.
+ * <p/>
+ * Implementations of this class only need to specify functionality that is specific to the
+ * Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
--- End diff --
It indicates by default the parameters are not null.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187729286
--- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+ @Override
+ protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException {
+ requireNonNull(taskConfig);
+
+ // Parse the configuration object.
+ final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
+
+ // Connect to the instance of Accumulo.
+ final Connector connector;
+ try {
+ final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+ connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() ));
+ } catch (AccumuloException | AccumuloSecurityException e) {
+ throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e);
+ }
+
+ // Use a RyaClient to see if the configured instance exists.
+ try {
+ final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
+ config.getUsername(),
+ config.getPassword().toCharArray(),
+ config.getClusterName(),
+ config.getZookeepers());
+ final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector);
+
+ if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
+ throw new ConnectException("The Rya Instance named " +
+ config.getRyaInstanceName() + " has not been installed.");
--- End diff --
I'm assuming rya.api. Done.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187735910
--- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java ---
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.repository.sail.SailRepository;
+import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
+import org.eclipse.rdf4j.sail.Sail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Handles the common components required to write {@link Statement}s to Rya.
+ * <p/>
+ * Implementations of this class only need to specify functionality that is specific to the
+ * Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
+public abstract class RyaSinkTask extends SinkTask {
+ private static final Logger log = LoggerFactory.getLogger(RyaSinkTask.class);
+
+ @Nullable
+ private SailRepository sailRepo = null;
+
+ @Nullable
+ private SailRepositoryConnection conn = null;
+
+ /**
+ * Throws an exception if the configured Rya Instance is not already installed
+ * within the configured database.
+ *
+ * @param taskConfig - The configuration values that were provided to the task. (not null)
+ * @throws ConnectException The configured Rya Instance is not installed to the configured database
+ * or we were unable to figure out if it is installed.
+ */
+ protected abstract void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException;
+
+ /**
+ * Creates an initialized {@link Sail} object that may be used to write {@link Statement}s to the configured
+ * Rya Instance.
+ *
+ * @param taskConfig - Configures how the Sail object will be created. (not null)
+ * @return The created Sail object.
+ * @throws ConnectException The Sail object could not be made.
+ */
+ protected abstract Sail makeSail(final Map<String, String> taskConfig) throws ConnectException;
+
+ @Override
+ public String version() {
+ return Manifests.exists("Build-Version") ? Manifests.read("Build-Version"): "UNKNOWN";
+ }
+
+ @Override
+ public void start(final Map<String, String> props) throws ConnectException {
+ requireNonNull(props);
+
+ // Ensure the configured Rya Instance is installed within the configured database.
+ checkRyaInstanceExists(props);
+
+ // Create the Sail object that is connected to the Rya Instance.
+ final Sail sail = makeSail(props);
+ sailRepo = new SailRepository( sail );
+ conn = sailRepo.getConnection();
+ }
+
+ @Override
+ public void put(final Collection<SinkRecord> records) {
+ requireNonNull(records);
+
+ // Return immediately if there are no records to handle.
+ if(records.isEmpty()) {
+ return;
+ }
+
+ // If a transaction has not been started yet, then start one.
+ if(!conn.isActive()) {
+ conn.begin();
+ }
+
+ // Iterate through the records and write them to the Sail object.
+ for(final SinkRecord record : records) {
+ // If everything has been configured correctly, then the record's value will be a Set<Statement>.
+ conn.add((Set<? extends Statement>) record.value());
+ }
+ }
+
+ @Override
+ public void flush(final Map<TopicPartition, OffsetAndMetadata> currentOffets) {
+ // Flush the current transaction.
+ conn.commit();
+ }
+
+ @Override
+ public void stop() {
+ try {
--- End diff --
that sail object is shut down internally within the SailRepository class when the SailRepository's shutdown method is invoked. Check like 169 of SailRepository.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187640119
--- Diff: dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java ---
@@ -274,16 +274,17 @@ public boolean getUseAggregationPipeline() {
* on their child subtrees.
* @param value whether to use aggregation pipeline optimization.
*/
- public void setUseAggregationPipeline(boolean value) {
+ public void setUseAggregationPipeline(final boolean value) {
setBoolean(USE_AGGREGATION_PIPELINE, value);
}
@Override
public List<Class<QueryOptimizer>> getOptimizers() {
- List<Class<QueryOptimizer>> optimizers = super.getOptimizers();
+ final List<Class<QueryOptimizer>> optimizers = super.getOptimizers();
if (getUseAggregationPipeline()) {
- Class<?> cl = AggregationPipelineQueryOptimizer.class;
+ final Class<?> cl = AggregationPipelineQueryOptimizer.class;
@SuppressWarnings("unchecked")
+ final
--- End diff --
Put final on the same line as its variable
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187629391
--- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+ @Override
+ protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException {
+ requireNonNull(taskConfig);
+
+ // Parse the configuration object.
+ final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
+
+ // Connect to the instance of Accumulo.
+ final Connector connector;
+ try {
+ final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+ connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() ));
+ } catch (AccumuloException | AccumuloSecurityException e) {
+ throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e);
+ }
+
+ // Use a RyaClient to see if the configured instance exists.
+ try {
+ final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
+ config.getUsername(),
+ config.getPassword().toCharArray(),
+ config.getClusterName(),
+ config.getZookeepers());
+ final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector);
+
+ if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
+ throw new ConnectException("The Rya Instance named " +
+ config.getRyaInstanceName() + " has not been installed.");
+ }
+
+ } catch (final RyaClientException e) {
+ throw new ConnectException("Unable to determine if the Rya Instance named " +
+ config.getRyaInstanceName() + " has been installed.", e);
--- End diff --
same
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187731028
--- Diff: extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/CLIDriver.java ---
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.client;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand.ArgumentsException;
+import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand.ExecutionException;
+import org.apache.rya.kafka.connect.client.command.ReadStatementsCommand;
+import org.apache.rya.kafka.connect.client.command.WriteStatementsCommand;
+import org.eclipse.rdf4j.model.Statement;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A CLI tool used to read/write {@link Statement}s to/from a Kafka topic using the format
+ * the Rya Kafka Connect Sinks expect.
+ */
+@DefaultAnnotation(NonNull.class)
+public class CLIDriver {
+
+ /**
+ * Maps from command strings to the object that performs the command.
+ */
+ private static final ImmutableMap<String, RyaKafkaClientCommand> COMMANDS;
+ static {
+ final Set<Class<? extends RyaKafkaClientCommand>> commandClasses = new HashSet<>();
+ commandClasses.add(ReadStatementsCommand.class);
+ commandClasses.add(WriteStatementsCommand.class);
+ final ImmutableMap.Builder<String, RyaKafkaClientCommand> builder = ImmutableMap.builder();
+ for(final Class<? extends RyaKafkaClientCommand> commandClass : commandClasses) {
+ try {
+ final RyaKafkaClientCommand command = commandClass.newInstance();
+ builder.put(command.getCommand(), command);
+ } catch (InstantiationException | IllegalAccessException e) {
+ System.err.println("Could not run the application because a RyaKafkaClientCommand is missing its empty constructor.");
+ e.printStackTrace();
+ }
+ }
+ COMMANDS = builder.build();
+ }
+
+ private static final String USAGE = makeUsage(COMMANDS);
+
+ public static void main(final String[] args) {
+ // If no command provided or the command isn't recognized, then print the usage.
+ if (args.length == 0 || !COMMANDS.containsKey(args[0])) {
+ System.out.println(USAGE);
+ System.exit(1);
+ }
+
+ // Fetch the command that will be executed.
+ final String command = args[0];
+ final String[] commandArgs = Arrays.copyOfRange(args, 1, args.length);
+ final RyaKafkaClientCommand clientCommand = COMMANDS.get(command);
+
+ // Print usage if the arguments are invalid for the command.
+ if(!clientCommand.validArguments(commandArgs)) {
+ System.out.println(clientCommand.getUsage());
+ System.exit(1);
+ }
+
+ // Execute the command.
+ try {
--- End diff --
Done.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187701248
--- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java ---
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkConnector} that uses an Accumulo Rya backend when creating tasks.
+ */
+@DefaultAnnotation(NonNull.class)
--- End diff --
since there is one field that is marked nullable, is this annotation usefull?
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187742000
--- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java ---
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFWriter;
+import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka {@link Serializer} that is able to serialize a set of {@link Statement}s
+ * using the RDF4J Rio Binary format.
--- End diff --
I mean that's true, but I don't think it's worth documenting.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187702276
--- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+ @Override
+ protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException {
+ requireNonNull(taskConfig);
+
+ // Parse the configuration object.
+ final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
+
+ // Connect to the instance of Accumulo.
+ final Connector connector;
+ try {
+ final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+ connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() ));
+ } catch (AccumuloException | AccumuloSecurityException e) {
+ throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e);
+ }
+
+ // Use a RyaClient to see if the configured instance exists.
+ try {
+ final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
+ config.getUsername(),
+ config.getPassword().toCharArray(),
+ config.getClusterName(),
+ config.getZookeepers());
+ final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector);
+
+ if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
+ throw new ConnectException("The Rya Instance named " +
+ config.getRyaInstanceName() + " has not been installed.");
--- End diff --
LogUtils from which package?
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187637697
--- Diff: extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/WriteStatementsCommand.java ---
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.client.command;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.kafka.connect.api.StatementsSerializer;
+import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand;
+import org.apache.rya.rdftriplestore.utils.RdfFormatUtils;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFFormat;
+import org.eclipse.rdf4j.rio.RDFHandlerException;
+import org.eclipse.rdf4j.rio.RDFParseException;
+import org.eclipse.rdf4j.rio.RDFParser;
+import org.eclipse.rdf4j.rio.Rio;
+import org.eclipse.rdf4j.rio.UnsupportedRDFormatException;
+import org.eclipse.rdf4j.rio.helpers.AbstractRDFHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Writes {@link Statement}s to a Kafka topic using the Rya Kafka Connect Sink format.
+ */
+@DefaultAnnotation(NonNull.class)
+public class WriteStatementsCommand implements RyaKafkaClientCommand {
+ private static final Logger log = LoggerFactory.getLogger(WriteStatementsCommand.class);
+
+ /**
+ * Command line parameters that are used by this command to configure itself.
+ */
+ public static class WriteParameters extends KafkaParameters {
+ @Parameter(names = {"--statementsFile", "-f"}, required = true, description = "The file of RDF statements to load into Rya Streams.")
+ public String statementsFile;
+ }
+
+ @Override
+ public String getCommand() {
+ return "write";
+ }
+
+ @Override
+ public String getDescription() {
+ return "Writes Statements to the specified Kafka topic.";
+ }
+
+ @Override
+ public boolean validArguments(final String[] args) {
+ boolean valid = true;
+ try {
+ new JCommander(new WriteParameters(), args);
+ } catch(final ParameterException e) {
+ valid = false;
+ }
+ return valid;
+ }
+
+ /**
+ * @return Describes what arguments may be provided to the command.
+ */
+ @Override
+ public String getUsage() {
+ final JCommander parser = new JCommander(new WriteParameters());
+
+ final StringBuilder usage = new StringBuilder();
+ parser.usage(usage);
+ return usage.toString();
+ }
+
+ @Override
+ public void execute(final String[] args) throws ArgumentsException, ExecutionException {
+ requireNonNull(args);
+
+ // Parse the command line arguments.
+ final WriteParameters params = new WriteParameters();
+ try {
+ new JCommander(params, args);
+ } catch(final ParameterException e) {
+ throw new ArgumentsException("Could not stream the query's results because of invalid command line parameters.", e);
+ }
+
+ // Verify the configured statements file path.
+ final Path statementsPath = Paths.get(params.statementsFile);
+ if(!statementsPath.toFile().exists()) {
+ throw new ArgumentsException("Could not load statements at path '" + statementsPath + "' because that " +
+ "file does not exist. Make sure you've entered the correct path.");
+ }
+
+ // Create an RDF Parser whose format is derived from the statementPath's file extension.
+ final String filename = statementsPath.getFileName().toString();
+ final RDFFormat format = RdfFormatUtils.forFileName(filename);
+ if (format == null) {
+ throw new UnsupportedRDFormatException("Unknown RDF format for the file: " + filename);
+ }
+ final RDFParser parser = Rio.createParser(format);
+
+ // Set up the producer.
+ try(Producer<String, Set<Statement>> producer = makeProducer(params)) {
+ // Set a handler that writes the statements to the specified kafka topic. It writes batches of 5 Statements.
+ parser.setRDFHandler(new AbstractRDFHandler() {
+
+ private Set<Statement> batch = new HashSet<>(5);
+
+ @Override
+ public void startRDF() throws RDFHandlerException {
+ log.trace("Starting loading statements.");
+ }
+
+ @Override
+ public void handleStatement(final Statement stmnt) throws RDFHandlerException {
+ log.trace("Adding statement.");
+ batch.add(stmnt);
+
+ if(batch.size() == 5) {
+ flushBatch();
+ }
+ }
+
+ @Override
+ public void endRDF() throws RDFHandlerException {
+ if(!batch.isEmpty()) {
+ flushBatch();
+ }
+ log.trace("Done.");
+ }
+
+ private void flushBatch() {
+ log.trace("Flushing batch of size " + batch.size());
+ producer.send(new ProducerRecord<>(params.topic, null, batch));
+ batch = new HashSet<>(5);
+ producer.flush();
+ }
+ });
+
+ // Do the parse and load.
+ try {
+ parser.parse(Files.newInputStream(statementsPath), "");
+ } catch (RDFParseException | RDFHandlerException | IOException e) {
+ throw new ExecutionException("Could not load the RDF file's Statements into the Kafka topic.", e);
+ }
+ }
+ }
+
+ private Producer<String, Set<Statement>> makeProducer(final KafkaParameters params) {
--- End diff --
make static
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187729543
--- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+ @Override
+ protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException {
+ requireNonNull(taskConfig);
+
+ // Parse the configuration object.
+ final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
+
+ // Connect to the instance of Accumulo.
+ final Connector connector;
+ try {
+ final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+ connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() ));
+ } catch (AccumuloException | AccumuloSecurityException e) {
+ throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e);
+ }
+
+ // Use a RyaClient to see if the configured instance exists.
+ try {
+ final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
+ config.getUsername(),
+ config.getPassword().toCharArray(),
+ config.getClusterName(),
+ config.getZookeepers());
+ final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector);
+
+ if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
+ throw new ConnectException("The Rya Instance named " +
+ config.getRyaInstanceName() + " has not been installed.");
+ }
+
+ } catch (final RyaClientException e) {
+ throw new ConnectException("Unable to determine if the Rya Instance named " +
+ config.getRyaInstanceName() + " has been installed.", e);
--- End diff --
Done.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187731146
--- Diff: extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/WriteStatementsCommand.java ---
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.client.command;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.kafka.connect.api.StatementsSerializer;
+import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand;
+import org.apache.rya.rdftriplestore.utils.RdfFormatUtils;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFFormat;
+import org.eclipse.rdf4j.rio.RDFHandlerException;
+import org.eclipse.rdf4j.rio.RDFParseException;
+import org.eclipse.rdf4j.rio.RDFParser;
+import org.eclipse.rdf4j.rio.Rio;
+import org.eclipse.rdf4j.rio.UnsupportedRDFormatException;
+import org.eclipse.rdf4j.rio.helpers.AbstractRDFHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Writes {@link Statement}s to a Kafka topic using the Rya Kafka Connect Sink format.
+ */
+@DefaultAnnotation(NonNull.class)
+public class WriteStatementsCommand implements RyaKafkaClientCommand {
+ private static final Logger log = LoggerFactory.getLogger(WriteStatementsCommand.class);
+
+ /**
+ * Command line parameters that are used by this command to configure itself.
+ */
+ public static class WriteParameters extends KafkaParameters {
+ @Parameter(names = {"--statementsFile", "-f"}, required = true, description = "The file of RDF statements to load into Rya Streams.")
+ public String statementsFile;
+ }
+
+ @Override
+ public String getCommand() {
+ return "write";
+ }
+
+ @Override
+ public String getDescription() {
+ return "Writes Statements to the specified Kafka topic.";
+ }
+
+ @Override
+ public boolean validArguments(final String[] args) {
+ boolean valid = true;
+ try {
+ new JCommander(new WriteParameters(), args);
+ } catch(final ParameterException e) {
+ valid = false;
+ }
+ return valid;
+ }
+
+ /**
+ * @return Describes what arguments may be provided to the command.
+ */
+ @Override
+ public String getUsage() {
+ final JCommander parser = new JCommander(new WriteParameters());
+
+ final StringBuilder usage = new StringBuilder();
+ parser.usage(usage);
+ return usage.toString();
+ }
+
+ @Override
+ public void execute(final String[] args) throws ArgumentsException, ExecutionException {
+ requireNonNull(args);
+
+ // Parse the command line arguments.
+ final WriteParameters params = new WriteParameters();
+ try {
+ new JCommander(params, args);
+ } catch(final ParameterException e) {
+ throw new ArgumentsException("Could not stream the query's results because of invalid command line parameters.", e);
+ }
+
+ // Verify the configured statements file path.
+ final Path statementsPath = Paths.get(params.statementsFile);
+ if(!statementsPath.toFile().exists()) {
+ throw new ArgumentsException("Could not load statements at path '" + statementsPath + "' because that " +
+ "file does not exist. Make sure you've entered the correct path.");
+ }
+
+ // Create an RDF Parser whose format is derived from the statementPath's file extension.
+ final String filename = statementsPath.getFileName().toString();
+ final RDFFormat format = RdfFormatUtils.forFileName(filename);
+ if (format == null) {
+ throw new UnsupportedRDFormatException("Unknown RDF format for the file: " + filename);
+ }
+ final RDFParser parser = Rio.createParser(format);
+
+ // Set up the producer.
+ try(Producer<String, Set<Statement>> producer = makeProducer(params)) {
+ // Set a handler that writes the statements to the specified kafka topic. It writes batches of 5 Statements.
+ parser.setRDFHandler(new AbstractRDFHandler() {
+
+ private Set<Statement> batch = new HashSet<>(5);
+
+ @Override
+ public void startRDF() throws RDFHandlerException {
+ log.trace("Starting loading statements.");
+ }
+
+ @Override
+ public void handleStatement(final Statement stmnt) throws RDFHandlerException {
+ log.trace("Adding statement.");
+ batch.add(stmnt);
+
+ if(batch.size() == 5) {
+ flushBatch();
+ }
+ }
+
+ @Override
+ public void endRDF() throws RDFHandlerException {
+ if(!batch.isEmpty()) {
+ flushBatch();
+ }
+ log.trace("Done.");
+ }
+
+ private void flushBatch() {
+ log.trace("Flushing batch of size " + batch.size());
+ producer.send(new ProducerRecord<>(params.topic, null, batch));
+ batch = new HashSet<>(5);
+ producer.flush();
+ }
+ });
+
+ // Do the parse and load.
+ try {
+ parser.parse(Files.newInputStream(statementsPath), "");
+ } catch (RDFParseException | RDFHandlerException | IOException e) {
+ throw new ExecutionException("Could not load the RDF file's Statements into the Kafka topic.", e);
+ }
+ }
+ }
+
+ private Producer<String, Set<Statement>> makeProducer(final KafkaParameters params) {
--- End diff --
Done.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187657223
--- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+ @Override
+ protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException {
+ requireNonNull(taskConfig);
+
+ // Parse the configuration object.
+ final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
+
+ // Connect to the instance of Accumulo.
+ final Connector connector;
+ try {
+ final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+ connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() ));
+ } catch (AccumuloException | AccumuloSecurityException e) {
+ throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e);
+ }
+
+ // Use a RyaClient to see if the configured instance exists.
+ try {
+ final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
+ config.getUsername(),
+ config.getPassword().toCharArray(),
+ config.getClusterName(),
+ config.getZookeepers());
+ final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector);
+
+ if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
+ throw new ConnectException("The Rya Instance named " +
+ config.getRyaInstanceName() + " has not been installed.");
+ }
+
+ } catch (final RyaClientException e) {
+ throw new ConnectException("Unable to determine if the Rya Instance named " +
+ config.getRyaInstanceName() + " has been installed.", e);
+ }
+ }
+
+ @Override
+ protected Sail makeSail(final Map<String, String> taskConfig) throws ConnectException {
+ requireNonNull(taskConfig);
+
+ // Parse the configuration object.
+ final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
+
+ // Move the configuration into a Rya Configuration object.
+ final AccumuloRdfConfiguration ryaConfig = new AccumuloRdfConfiguration();
+ ryaConfig.setTablePrefix( config.getRyaInstanceName() );
+ ryaConfig.setAccumuloZookeepers( config.getZookeepers() );
+ ryaConfig.setAccumuloInstance( config.getClusterName() );
+ ryaConfig.setAccumuloUser( config.getUsername() );
+ ryaConfig.setAccumuloPassword( config.getPassword() );
+
+ // Create the Sail object.
+ try {
+ return RyaSailFactory.getInstance(ryaConfig);
--- End diff --
I'm under the impression that code will not work if the Geo jars aren't included on the classpath.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187729976
--- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java ---
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFWriter;
+import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka {@link Serializer} that is able to serialize a set of {@link Statement}s
+ * using the RDF4J Rio Binary format.
+ */
+@DefaultAnnotation(NonNull.class)
+public class StatementsSerializer implements Serializer<Set<Statement>> {
+ private static final Logger log = LoggerFactory.getLogger(StatementsSerializer.class);
+
+ private static final BinaryRDFWriterFactory WRITER_FACTORY = new BinaryRDFWriterFactory();
+
+ @Override
+ public void configure(final Map<String, ?> configs, final boolean isKey) {
+ // Nothing to do.
+ }
+
+ @Override
+ public byte[] serialize(final String topic, final Set<Statement> data) {
+ if(data == null) {
+ // Returning null because that is the contract of this method.
+ return null;
+ }
+
+ // Write the statements using a Binary RDF Writer.
+ final ByteArrayOutputStream boas = new ByteArrayOutputStream();
--- End diff --
Done.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187627726
--- Diff: extras/kafka.connect/accumulo-it/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTaskIT.java ---
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.test.accumulo.AccumuloITBase;
+import org.junit.Test;
+
+/**
+ * Integration tests the methods of {@link AccumuloRyaSinkTask}.
--- End diff --
typo.
...tests for the methods...
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187729813
--- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsDeserializer.java ---
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFHandler;
+import org.eclipse.rdf4j.rio.RDFHandlerException;
+import org.eclipse.rdf4j.rio.RDFParseException;
+import org.eclipse.rdf4j.rio.RDFParser;
+import org.eclipse.rdf4j.rio.binary.BinaryRDFParserFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka {@link Deserializer} that is able to deserialize an RDF4J Rio Binary format serialized
+ * set of {@link Statement}s.
+ */
+@DefaultAnnotation(NonNull.class)
+public class StatementsDeserializer implements Deserializer<Set<Statement>> {
+ private static final Logger log = LoggerFactory.getLogger(StatementsDeserializer.class);
+
+ private static final BinaryRDFParserFactory PARSER_FACTORY = new BinaryRDFParserFactory();
+
+ @Override
+ public void configure(final Map<String, ?> configs, final boolean isKey) {
+ // Nothing to do.
+ }
+
+ @Override
+ public Set<Statement> deserialize(final String topic, final byte[] data) {
+ if(data == null || data.length == 0) {
+ // Return null because that is the contract of this method.
+ return null;
+ }
+
+ try {
+ final RDFParser parser = PARSER_FACTORY.getParser();
+ final Set<Statement> statements = new HashSet<>();
+
+ parser.setRDFHandler(new RDFHandler() {
+ @Override
+ public void handleStatement(final Statement statement) throws RDFHandlerException {
+ log.debug("Statement: " + statement);
+ statements.add( statement );
+ }
+
+ @Override public void startRDF() throws RDFHandlerException { }
+ @Override public void handleNamespace(final String arg0, final String arg1) throws RDFHandlerException { }
+ @Override public void handleComment(final String arg0) throws RDFHandlerException { }
+ @Override public void endRDF() throws RDFHandlerException { }
+ });
+
+ parser.parse(new ByteArrayInputStream(data), null);
+ return statements;
+
+ } catch(final RDFParseException | RDFHandlerException | IOException e) {
+ log.error("Could not deserialize a Set of VisibilityStatement objects using the RDF4J Rio Binray format.", e);
--- End diff --
Done.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187701955
--- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+ @Override
+ protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException {
+ requireNonNull(taskConfig);
+
+ // Parse the configuration object.
+ final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
+
+ // Connect to the instance of Accumulo.
+ final Connector connector;
+ try {
+ final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+ connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() ));
+ } catch (AccumuloException | AccumuloSecurityException e) {
+ throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e);
+ }
+
+ // Use a RyaClient to see if the configured instance exists.
+ try {
+ final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
--- End diff --
a quick future work nice to have idea: we have a ConnectionDetails->Configuration, but not the other way around. would make stuff like this pretty easy.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187718927
--- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConnector.java ---
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkConnector} that uses a Mongo DB Rya backend when creating tasks.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoRyaSinkConnector extends RyaSinkConnector {
+
+ @Nullable
+ private MongoRyaSinkConfig config = null;
+
+ @Override
+ public void start(final Map<String, String> props) {
+ this.config = new MongoRyaSinkConfig( props );
+ }
+
+ @Override
+ protected AbstractConfig getConfig() {
+ if(config == null) {
+ throw new IllegalStateException("The configuration has not been set yet. Invoke start(props) first.");
--- End diff --
same start(Map)
---
[GitHub] incubator-rya issue #296: RYA-487 Kafka Connect Sinks
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:
https://github.com/apache/incubator-rya/pull/296
Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/760/
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187717488
--- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfig.java ---
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka Connect configuration that is used to configure {@link MongoRyaSinkConnector}s and {@link MongoRyaSinkTask}s.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoRyaSinkConfig extends RyaSinkConfig {
+
+ public static final String HOSTNAME = "mongo.hostname";
+ private static final String HOSTNAME_DOC = "The Mongo DB hostname the Sail connections wlll use.";
--- End diff --
typo: wlll
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187732940
--- Diff: extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java ---
@@ -438,6 +438,9 @@ public static boolean getUsePcjUpdaterIndex(final Configuration conf) {
return Optional.fromNullable(conf.get(FLUO_APP_NAME));
}
+ public static void setUseMongo(final Configuration conf, final boolean useMongo) {
--- End diff --
I don't wan to mess with how our configuration objects are initialized for the scope of this ticket.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r188060281
--- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsDeserializer.java ---
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFHandler;
+import org.eclipse.rdf4j.rio.RDFHandlerException;
+import org.eclipse.rdf4j.rio.RDFParseException;
+import org.eclipse.rdf4j.rio.RDFParser;
+import org.eclipse.rdf4j.rio.binary.BinaryRDFParserFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka {@link Deserializer} that is able to deserialize an RDF4J Rio Binary format serialized
+ * set of {@link Statement}s.
+ */
+@DefaultAnnotation(NonNull.class)
+public class StatementsDeserializer implements Deserializer<Set<Statement>> {
+ private static final Logger log = LoggerFactory.getLogger(StatementsDeserializer.class);
+
+ private static final BinaryRDFParserFactory PARSER_FACTORY = new BinaryRDFParserFactory();
+
+ @Override
+ public void configure(final Map<String, ?> configs, final boolean isKey) {
+ // Nothing to do.
+ }
+
+ @Override
+ public Set<Statement> deserialize(final String topic, final byte[] data) {
+ if(data == null || data.length == 0) {
+ // Return null because that is the contract of this method.
+ return null;
+ }
+
+ try {
+ final RDFParser parser = PARSER_FACTORY.getParser();
+ final Set<Statement> statements = new HashSet<>();
+
+ parser.setRDFHandler(new RDFHandler() {
--- End diff --
Done.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187733815
--- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+ @Override
+ protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException {
+ requireNonNull(taskConfig);
+
+ // Parse the configuration object.
+ final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
+
+ // Connect to the instance of Accumulo.
+ final Connector connector;
+ try {
+ final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+ connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() ));
+ } catch (AccumuloException | AccumuloSecurityException e) {
+ throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e);
+ }
+
+ // Use a RyaClient to see if the configured instance exists.
+ try {
+ final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
--- End diff --
That's true. Feel free to write an improvement ticket for that.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187731624
--- Diff: dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java ---
@@ -274,16 +274,17 @@ public boolean getUseAggregationPipeline() {
* on their child subtrees.
* @param value whether to use aggregation pipeline optimization.
*/
- public void setUseAggregationPipeline(boolean value) {
+ public void setUseAggregationPipeline(final boolean value) {
setBoolean(USE_AGGREGATION_PIPELINE, value);
}
@Override
public List<Class<QueryOptimizer>> getOptimizers() {
- List<Class<QueryOptimizer>> optimizers = super.getOptimizers();
+ final List<Class<QueryOptimizer>> optimizers = super.getOptimizers();
if (getUseAggregationPipeline()) {
- Class<?> cl = AggregationPipelineQueryOptimizer.class;
+ final Class<?> cl = AggregationPipelineQueryOptimizer.class;
@SuppressWarnings("unchecked")
+ final
--- End diff --
Done.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187980159
--- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsDeserializer.java ---
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFHandler;
+import org.eclipse.rdf4j.rio.RDFHandlerException;
+import org.eclipse.rdf4j.rio.RDFParseException;
+import org.eclipse.rdf4j.rio.RDFParser;
+import org.eclipse.rdf4j.rio.binary.BinaryRDFParserFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka {@link Deserializer} that is able to deserialize an RDF4J Rio Binary format serialized
+ * set of {@link Statement}s.
+ */
+@DefaultAnnotation(NonNull.class)
+public class StatementsDeserializer implements Deserializer<Set<Statement>> {
+ private static final Logger log = LoggerFactory.getLogger(StatementsDeserializer.class);
+
+ private static final BinaryRDFParserFactory PARSER_FACTORY = new BinaryRDFParserFactory();
+
+ @Override
+ public void configure(final Map<String, ?> configs, final boolean isKey) {
+ // Nothing to do.
+ }
+
+ @Override
+ public Set<Statement> deserialize(final String topic, final byte[] data) {
+ if(data == null || data.length == 0) {
+ // Return null because that is the contract of this method.
+ return null;
+ }
+
+ try {
+ final RDFParser parser = PARSER_FACTORY.getParser();
+ final Set<Statement> statements = new HashSet<>();
+
+ parser.setRDFHandler(new RDFHandler() {
--- End diff --
Use AbstractRDFHandler since only handleStatement() needs to be overridden and the other overridden methods can be deleted.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187739974
--- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java ---
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.repository.sail.SailRepository;
+import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
+import org.eclipse.rdf4j.sail.Sail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Handles the common components required to write {@link Statement}s to Rya.
+ * <p/>
+ * Implementations of this class only need to specify functionality that is specific to the
+ * Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
+public abstract class RyaSinkTask extends SinkTask {
+ private static final Logger log = LoggerFactory.getLogger(RyaSinkTask.class);
+
+ @Nullable
+ private SailRepository sailRepo = null;
+
+ @Nullable
+ private SailRepositoryConnection conn = null;
+
+ /**
+ * Throws an exception if the configured Rya Instance is not already installed
+ * within the configured database.
+ *
+ * @param taskConfig - The configuration values that were provided to the task. (not null)
+ * @throws ConnectException The configured Rya Instance is not installed to the configured database
+ * or we were unable to figure out if it is installed.
+ */
+ protected abstract void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException;
+
+ /**
+ * Creates an initialized {@link Sail} object that may be used to write {@link Statement}s to the configured
+ * Rya Instance.
+ *
+ * @param taskConfig - Configures how the Sail object will be created. (not null)
+ * @return The created Sail object.
+ * @throws ConnectException The Sail object could not be made.
+ */
+ protected abstract Sail makeSail(final Map<String, String> taskConfig) throws ConnectException;
+
+ @Override
+ public String version() {
+ return Manifests.exists("Build-Version") ? Manifests.read("Build-Version"): "UNKNOWN";
+ }
+
+ @Override
+ public void start(final Map<String, String> props) throws ConnectException {
+ requireNonNull(props);
+
+ // Ensure the configured Rya Instance is installed within the configured database.
+ checkRyaInstanceExists(props);
+
+ // Create the Sail object that is connected to the Rya Instance.
+ final Sail sail = makeSail(props);
+ sailRepo = new SailRepository( sail );
+ conn = sailRepo.getConnection();
+ }
+
+ @Override
+ public void put(final Collection<SinkRecord> records) {
+ requireNonNull(records);
+
+ // Return immediately if there are no records to handle.
+ if(records.isEmpty()) {
+ return;
+ }
+
+ // If a transaction has not been started yet, then start one.
+ if(!conn.isActive()) {
+ conn.begin();
+ }
+
+ // Iterate through the records and write them to the Sail object.
+ for(final SinkRecord record : records) {
+ // If everything has been configured correctly, then the record's value will be a Set<Statement>.
+ conn.add((Set<? extends Statement>) record.value());
+ }
+ }
+
+ @Override
+ public void flush(final Map<TopicPartition, OffsetAndMetadata> currentOffets) {
+ // Flush the current transaction.
+ conn.commit();
+ }
+
+ @Override
+ public void stop() {
+ try {
--- End diff --
👍
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187735067
--- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java ---
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.repository.sail.SailRepository;
+import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
+import org.eclipse.rdf4j.sail.Sail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Handles the common components required to write {@link Statement}s to Rya.
+ * <p/>
+ * Implementations of this class only need to specify functionality that is specific to the
+ * Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
--- End diff --
"/**
* Indicates that all members of the class or package should be annotated with the default value of the supplied
* annotation class. This would be used for behavior annotations such as @NonNull, @CheckForNull,
* or @CheckReturnValue. In particular, you can use @DefaultAnnotation(NonNull.class) on a class or package,
* and then use @Nullable only on those parameters, methods or fields that you want to allow to be null.
*/"
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187734176
--- Diff: extras/kafka.connect/accumulo/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfigTest.java ---
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig;
+import org.junit.Test;
+
+/**
+ * Unit tests the methods of {@link AccumuloRyaSinkConfig}.
+ */
+public class AccumuloRyaSinkConfigTest {
+
+ @Test
+ public void parses() {
--- End diff --
Could you give an example of a malformed field? Do you just mean fields that are not part of the schema? That's not illegal. They just get ignored.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187636404
--- Diff: extras/kafka.connect/client/pom.xml ---
@@ -0,0 +1,135 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.kafka.connect.parent</artifactId>
+ <version>4.0.0-incubating-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>rya.kafka.connect.client</artifactId>
+
+ <name>Apache Rya Kafka Connect - Client</name>
+ <description>Contains a client that may be used to load Statements into
+ a Kafka topic to be read by Kafka Connect.</description>
+
+ <dependencies>
+ <!-- 1st party dependencies. -->
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.sail</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.kafka.connect.api</artifactId>
+ </dependency>
+
+ <!-- 3rd party dependencies. -->
+ <dependency>
+ <groupId>org.eclipse.rdf4j</groupId>
+ <artifactId>rdf4j-model</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.beust</groupId>
+ <artifactId>jcommander</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.github.stephenc.findbugs</groupId>
+ <artifactId>findbugs-annotations</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
+
+ <!-- Statement formats we support for loading. -->
--- End diff --
I think pulling in rya.sail as a dependency will give you all the RDF Formats. So, this probably isn't needed.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187729157
--- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+ @Override
+ protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException {
+ requireNonNull(taskConfig);
+
+ // Parse the configuration object.
+ final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
+
+ // Connect to the instance of Accumulo.
+ final Connector connector;
+ try {
+ final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+ connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() ));
+ } catch (AccumuloException | AccumuloSecurityException e) {
--- End diff --
Done.
---
[GitHub] incubator-rya issue #296: RYA-487 Kafka Connect Sinks
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:
https://github.com/apache/incubator-rya/pull/296
Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/761/
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187717175
--- Diff: extras/kafka.connect/mongo-it/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTaskIT.java ---
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.mongo.MongoConnectionDetails;
+import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
+import org.apache.rya.test.mongo.MongoITBase;
+import org.junit.Test;
+
+/**
+ * Integration tests the methods of {@link MongoRyaSinkTask}.
+ */
+public class MongoRyaSinkTaskIT extends MongoITBase {
+
+ @Test
+ public void instanceExists() throws Exception {
+ // Install an instance of Rya.
+ final String ryaInstanceName = "rya";
+ final MongoConnectionDetails connectionDetails = new MongoConnectionDetails(
+ super.getMongoHostname(),
+ super.getMongoPort(),
+ Optional.empty(),
+ Optional.empty());
+
+ final InstallConfiguration installConfig = InstallConfiguration.builder()
+ .setEnableTableHashPrefix(false)
+ .setEnableEntityCentricIndex(false)
+ .setEnableFreeTextIndex(false)
+ .setEnableTemporalIndex(false)
+ .setEnablePcjIndex(false)
+ .setEnableGeoIndex(false)
+ .build();
+
+ final RyaClient ryaClient = MongoRyaClientFactory.build(connectionDetails, super.getMongoClient());
+ ryaClient.getInstall().install(ryaInstanceName, installConfig);
+
+ // Create the task that will be tested.
+ final MongoRyaSinkTask task = new MongoRyaSinkTask();
+
+ try {
+ // Configure the task to use the embedded Mongo DB instance for Rya.
+ final Map<String, String> config = new HashMap<>();
+ config.put(MongoRyaSinkConfig.HOSTNAME, super.getMongoHostname());
+ config.put(MongoRyaSinkConfig.PORT, "" + super.getMongoPort());
+ config.put(MongoRyaSinkConfig.RYA_INSTANCE_NAME, "rya");
+
+ // This will pass because the Rya instance exists.
+ task.start(config);
+ } finally {
+ task.stop();
+ }
+ }
+
+ @Test(expected = ConnectException.class)
+ public void instanceDoesNotExist() throws Exception {
+ // Create the task that will be tested.
+ final MongoRyaSinkTask task = new MongoRyaSinkTask();
+
+ try {
+ // Configure the task to use the embedded Mongo DB instance for Rya.
+ final Map<String, String> config = new HashMap<>();
+ config.put(MongoRyaSinkConfig.HOSTNAME, super.getMongoHostname());
+ config.put(MongoRyaSinkConfig.PORT, "" + super.getMongoPort());
+ config.put(MongoRyaSinkConfig.RYA_INSTANCE_NAME, "instance-does-not-exist");
+
+ // Starting the task will fail because the Rya instance does not exist.
+ task.start(config);
+ } finally {
+ task.stop();
+ }
+ }
+
+ // TODO show that inserts using visibilities work.
--- End diff --
TODO?
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187733573
--- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java ---
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkConnector} that uses an Accumulo Rya backend when creating tasks.
+ */
+@DefaultAnnotation(NonNull.class)
--- End diff --
That field is nullable because this is a stateful object, but the parameters into the start(...) function may not be null. I'll add a null check there.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187732007
--- Diff: dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java ---
@@ -62,14 +62,14 @@ public AccumuloRdfConfiguration() {
super();
}
- public AccumuloRdfConfiguration(Configuration other) {
+ public AccumuloRdfConfiguration(final Configuration other) {
super(other);
}
- public AccumuloRdfConfigurationBuilder getBuilder() {
+ public static AccumuloRdfConfigurationBuilder getBuilder() {
--- End diff --
For me it is. I don't really want to refactor the entire method name in this review, though. I just needed it to be static so that I could use it.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187708244
--- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java ---
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkConnector} that uses an Accumulo Rya backend when creating tasks.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkConnector extends RyaSinkConnector {
+
+ @Nullable
+ private AccumuloRyaSinkConfig config = null;
+
+ @Override
+ public void start(final Map<String, String> props) {
+ this.config = new AccumuloRyaSinkConfig( props );
+ }
+
+ @Override
+ protected AbstractConfig getConfig() {
+ if(config == null) {
+ throw new IllegalStateException("The configuration has not been set yet. Invoke start(props) first.");
--- End diff --
usually the doc'd function has the parameters be the type, not the name: start(Map)
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187698795
--- Diff: extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java ---
@@ -438,6 +438,9 @@ public static boolean getUsePcjUpdaterIndex(final Configuration conf) {
return Optional.fromNullable(conf.get(FLUO_APP_NAME));
}
+ public static void setUseMongo(final Configuration conf, final boolean useMongo) {
--- End diff --
can you add this to the constructor of the MongoDbRDFConfiguration constructor? if we're going to keep using this field, it would make sense for that to set it.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187739762
--- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java ---
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFWriter;
+import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka {@link Serializer} that is able to serialize a set of {@link Statement}s
+ * using the RDF4J Rio Binary format.
--- End diff --
fair enough, it just seemed like you are delegating the actual serialization to something else.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187739445
--- Diff: extras/kafka.connect/accumulo/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfigTest.java ---
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig;
+import org.junit.Test;
+
+/**
+ * Unit tests the methods of {@link AccumuloRyaSinkConfig}.
+ */
+public class AccumuloRyaSinkConfigTest {
+
+ @Test
+ public void parses() {
--- End diff --
gotcha, the test just seemed lacking an error case
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187731454
--- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java ---
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.mongo.MongoConnectionDetails;
+import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import com.google.common.base.Strings;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoCredential;
+import com.mongodb.ServerAddress;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoRyaSinkTask extends RyaSinkTask {
+
+ @Override
+ protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws IllegalStateException {
+ requireNonNull(taskConfig);
+
+ // Parse the configuration object.
+ final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig);
+ @Nullable
+ final String username = Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername();
+ @Nullable
+ final char[] password = Strings.isNullOrEmpty(config.getPassword()) ? null : config.getPassword().toCharArray();
+
+ // Connect a Mongo Client to the configured Mongo DB instance.
+ final ServerAddress serverAddr = new ServerAddress(config.getHostname(), config.getPort());
+ final boolean hasCredentials = username != null && password != null;
+
+ try(MongoClient mongoClient = hasCredentials ?
+ new MongoClient(serverAddr, Arrays.asList(MongoCredential.createCredential(username, config.getRyaInstanceName(), password))) :
+ new MongoClient(serverAddr)) {
+ // Use a RyaClient to see if the configured instance exists.
+ // Create the Mongo Connection Details that describe the Mongo DB Server we are interacting with.
+ final MongoConnectionDetails connectionDetails = new MongoConnectionDetails(
+ config.getHostname(),
+ config.getPort(),
+ Optional.ofNullable(username),
+ Optional.ofNullable(password));
+
+ final RyaClient client = MongoRyaClientFactory.build(connectionDetails, mongoClient);
+ if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
+ throw new ConnectException("The Rya Instance named " +
+ config.getRyaInstanceName() + " has not been installed.");
+ }
+ } catch(final RyaClientException e) {
+ throw new ConnectException("Unable to determine if the Rya Instance named " +
+ config.getRyaInstanceName() + " has been installed.", e);
+ }
+ }
+
+ @Override
+ protected Sail makeSail(final Map<String, String> taskConfig) {
+ requireNonNull(taskConfig);
+
+ // Parse the configuration object.
+ final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig);
+
+ // Move the configuration into a Rya Configuration object.
+ final MongoDBRdfConfiguration ryaConfig = new MongoDBRdfConfiguration();
+ ConfigUtils.setUseMongo(ryaConfig, true);
+ ryaConfig.setMongoDBName( config.getRyaInstanceName() );
+ ryaConfig.setTablePrefix( config.getRyaInstanceName() );
+ ryaConfig.setMongoHostname( config.getHostname() );
+ ryaConfig.setMongoPort( "" + config.getPort() );
+
+ if(!Strings.isNullOrEmpty(config.getUsername()) && !Strings.isNullOrEmpty(config.getPassword())) {
+ ryaConfig.setMongoUser( config.getUsername() );
+ ryaConfig.setMongoPassword( config.getPassword() );
+ }
+
+ // Create the Sail object.
+ try {
+ return RyaSailFactory.getInstance(ryaConfig);
--- End diff --
Added documentation to the future work section of the manual. Done.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187730101
--- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConnector.java ---
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.connect.sink.SinkConnector;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Handles the common components required to task {@link RyaSinkTask}s that write to Rya.
+ * </p>
+ * Implementations of this class only need to specify functionality that is specific to the Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
+public abstract class RyaSinkConnector extends SinkConnector {
+
+ /**
+ * Get the configuration that will be provided to the tasks when {@link #taskConfigs(int)} is invoked.
+ * </p>
+ * Only called after start has been invoked
+ *
+ * @return The configuration object for the connector.
+ * @throws IllegalStateException Thrown if {@link SinkConnector#start(Map)} has not been invoked yet.
+ */
+ protected abstract AbstractConfig getConfig() throws IllegalStateException;
+
+ @Override
+ public String version() {
+ return Manifests.exists("Build-Version") ? Manifests.read("Build-Version") : "UNKNOWN";
+ }
+
+ @Override
+ public List<Map<String, String>> taskConfigs(final int maxTasks) {
+ final List<Map<String, String>> configs = new ArrayList<>(maxTasks);
+ for(int i = 0; i < maxTasks; i++) {
+ configs.add( getConfig().originalsStrings() );
+ }
+ return configs;
+ }
+
+ @Override
+ public void stop() {
+ // Nothing to do since the RyaSinkconnector has no background monitoring.
--- End diff --
Done.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187730515
--- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java ---
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.repository.sail.SailRepository;
+import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
+import org.eclipse.rdf4j.sail.Sail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Handles the common components required to write {@link Statement}s to Rya.
+ * <p/>
+ * Implementations of this class only need to specify functionality that is specific to the
+ * Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
+public abstract class RyaSinkTask extends SinkTask {
+ private static final Logger log = LoggerFactory.getLogger(RyaSinkTask.class);
+
+ @Nullable
+ private SailRepository sailRepo = null;
+
+ @Nullable
+ private SailRepositoryConnection conn = null;
+
+ /**
+ * Throws an exception if the configured Rya Instance is not already installed
+ * within the configured database.
+ *
+ * @param taskConfig - The configuration values that were provided to the task. (not null)
+ * @throws ConnectException The configured Rya Instance is not installed to the configured database
+ * or we were unable to figure out if it is installed.
+ */
+ protected abstract void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException;
+
+ /**
+ * Creates an initialized {@link Sail} object that may be used to write {@link Statement}s to the configured
+ * Rya Instance.
+ *
+ * @param taskConfig - Configures how the Sail object will be created. (not null)
+ * @return The created Sail object.
+ * @throws ConnectException The Sail object could not be made.
+ */
+ protected abstract Sail makeSail(final Map<String, String> taskConfig) throws ConnectException;
+
+ @Override
+ public String version() {
+ return Manifests.exists("Build-Version") ? Manifests.read("Build-Version"): "UNKNOWN";
+ }
+
+ @Override
+ public void start(final Map<String, String> props) throws ConnectException {
+ requireNonNull(props);
+
+ // Ensure the configured Rya Instance is installed within the configured database.
+ checkRyaInstanceExists(props);
+
+ // Create the Sail object that is connected to the Rya Instance.
+ final Sail sail = makeSail(props);
+ sailRepo = new SailRepository( sail );
+ conn = sailRepo.getConnection();
+ }
+
+ @Override
+ public void put(final Collection<SinkRecord> records) {
+ requireNonNull(records);
+
+ // Return immediately if there are no records to handle.
+ if(records.isEmpty()) {
+ return;
+ }
+
+ // If a transaction has not been started yet, then start one.
+ if(!conn.isActive()) {
+ conn.begin();
+ }
+
+ // Iterate through the records and write them to the Sail object.
+ for(final SinkRecord record : records) {
+ // If everything has been configured correctly, then the record's value will be a Set<Statement>.
+ conn.add((Set<? extends Statement>) record.value());
+ }
+ }
+
+ @Override
+ public void flush(final Map<TopicPartition, OffsetAndMetadata> currentOffets) {
+ // Flush the current transaction.
+ conn.commit();
--- End diff --
Done.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187634433
--- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java ---
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.repository.sail.SailRepository;
+import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
+import org.eclipse.rdf4j.sail.Sail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Handles the common components required to write {@link Statement}s to Rya.
+ * <p/>
+ * Implementations of this class only need to specify functionality that is specific to the
+ * Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
+public abstract class RyaSinkTask extends SinkTask {
+ private static final Logger log = LoggerFactory.getLogger(RyaSinkTask.class);
+
+ @Nullable
+ private SailRepository sailRepo = null;
+
+ @Nullable
+ private SailRepositoryConnection conn = null;
+
+ /**
+ * Throws an exception if the configured Rya Instance is not already installed
+ * within the configured database.
+ *
+ * @param taskConfig - The configuration values that were provided to the task. (not null)
+ * @throws ConnectException The configured Rya Instance is not installed to the configured database
+ * or we were unable to figure out if it is installed.
+ */
+ protected abstract void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException;
+
+ /**
+ * Creates an initialized {@link Sail} object that may be used to write {@link Statement}s to the configured
+ * Rya Instance.
+ *
+ * @param taskConfig - Configures how the Sail object will be created. (not null)
+ * @return The created Sail object.
+ * @throws ConnectException The Sail object could not be made.
+ */
+ protected abstract Sail makeSail(final Map<String, String> taskConfig) throws ConnectException;
+
+ @Override
+ public String version() {
+ return Manifests.exists("Build-Version") ? Manifests.read("Build-Version"): "UNKNOWN";
+ }
+
+ @Override
+ public void start(final Map<String, String> props) throws ConnectException {
+ requireNonNull(props);
+
+ // Ensure the configured Rya Instance is installed within the configured database.
+ checkRyaInstanceExists(props);
+
+ // Create the Sail object that is connected to the Rya Instance.
+ final Sail sail = makeSail(props);
+ sailRepo = new SailRepository( sail );
+ conn = sailRepo.getConnection();
+ }
+
+ @Override
+ public void put(final Collection<SinkRecord> records) {
+ requireNonNull(records);
+
+ // Return immediately if there are no records to handle.
+ if(records.isEmpty()) {
+ return;
+ }
+
+ // If a transaction has not been started yet, then start one.
+ if(!conn.isActive()) {
+ conn.begin();
+ }
+
+ // Iterate through the records and write them to the Sail object.
+ for(final SinkRecord record : records) {
+ // If everything has been configured correctly, then the record's value will be a Set<Statement>.
+ conn.add((Set<? extends Statement>) record.value());
+ }
+ }
+
+ @Override
+ public void flush(final Map<TopicPartition, OffsetAndMetadata> currentOffets) {
+ // Flush the current transaction.
+ conn.commit();
--- End diff --
add a null check
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187731377
--- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java ---
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.mongo.MongoConnectionDetails;
+import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import com.google.common.base.Strings;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoCredential;
+import com.mongodb.ServerAddress;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoRyaSinkTask extends RyaSinkTask {
+
+ @Override
+ protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws IllegalStateException {
+ requireNonNull(taskConfig);
+
+ // Parse the configuration object.
+ final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig);
+ @Nullable
+ final String username = Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername();
+ @Nullable
+ final char[] password = Strings.isNullOrEmpty(config.getPassword()) ? null : config.getPassword().toCharArray();
+
+ // Connect a Mongo Client to the configured Mongo DB instance.
+ final ServerAddress serverAddr = new ServerAddress(config.getHostname(), config.getPort());
+ final boolean hasCredentials = username != null && password != null;
+
+ try(MongoClient mongoClient = hasCredentials ?
+ new MongoClient(serverAddr, Arrays.asList(MongoCredential.createCredential(username, config.getRyaInstanceName(), password))) :
+ new MongoClient(serverAddr)) {
+ // Use a RyaClient to see if the configured instance exists.
+ // Create the Mongo Connection Details that describe the Mongo DB Server we are interacting with.
+ final MongoConnectionDetails connectionDetails = new MongoConnectionDetails(
+ config.getHostname(),
+ config.getPort(),
+ Optional.ofNullable(username),
+ Optional.ofNullable(password));
+
+ final RyaClient client = MongoRyaClientFactory.build(connectionDetails, mongoClient);
+ if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
+ throw new ConnectException("The Rya Instance named " +
+ config.getRyaInstanceName() + " has not been installed.");
--- End diff --
Done.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187736039
--- Diff: extras/kafka.connect/mongo-it/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTaskIT.java ---
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.mongo.MongoConnectionDetails;
+import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
+import org.apache.rya.test.mongo.MongoITBase;
+import org.junit.Test;
+
+/**
+ * Integration tests the methods of {@link MongoRyaSinkTask}.
+ */
+public class MongoRyaSinkTaskIT extends MongoITBase {
+
+ @Test
+ public void instanceExists() throws Exception {
+ // Install an instance of Rya.
+ final String ryaInstanceName = "rya";
+ final MongoConnectionDetails connectionDetails = new MongoConnectionDetails(
+ super.getMongoHostname(),
+ super.getMongoPort(),
+ Optional.empty(),
+ Optional.empty());
+
+ final InstallConfiguration installConfig = InstallConfiguration.builder()
+ .setEnableTableHashPrefix(false)
+ .setEnableEntityCentricIndex(false)
+ .setEnableFreeTextIndex(false)
+ .setEnableTemporalIndex(false)
+ .setEnablePcjIndex(false)
+ .setEnableGeoIndex(false)
+ .build();
+
+ final RyaClient ryaClient = MongoRyaClientFactory.build(connectionDetails, super.getMongoClient());
+ ryaClient.getInstall().install(ryaInstanceName, installConfig);
+
+ // Create the task that will be tested.
+ final MongoRyaSinkTask task = new MongoRyaSinkTask();
+
+ try {
+ // Configure the task to use the embedded Mongo DB instance for Rya.
+ final Map<String, String> config = new HashMap<>();
+ config.put(MongoRyaSinkConfig.HOSTNAME, super.getMongoHostname());
+ config.put(MongoRyaSinkConfig.PORT, "" + super.getMongoPort());
+ config.put(MongoRyaSinkConfig.RYA_INSTANCE_NAME, "rya");
+
+ // This will pass because the Rya instance exists.
+ task.start(config);
+ } finally {
+ task.stop();
+ }
+ }
+
+ @Test(expected = ConnectException.class)
+ public void instanceDoesNotExist() throws Exception {
+ // Create the task that will be tested.
+ final MongoRyaSinkTask task = new MongoRyaSinkTask();
+
+ try {
+ // Configure the task to use the embedded Mongo DB instance for Rya.
+ final Map<String, String> config = new HashMap<>();
+ config.put(MongoRyaSinkConfig.HOSTNAME, super.getMongoHostname());
+ config.put(MongoRyaSinkConfig.PORT, "" + super.getMongoPort());
+ config.put(MongoRyaSinkConfig.RYA_INSTANCE_NAME, "instance-does-not-exist");
+
+ // Starting the task will fail because the Rya instance does not exist.
+ task.start(config);
+ } finally {
+ task.stop();
+ }
+ }
+
+ // TODO show that inserts using visibilities work.
--- End diff --
Oh yea, we're not supporting that. Lemme delete that comment.
---
[GitHub] incubator-rya issue #296: RYA-487 Kafka Connect Sinks
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:
https://github.com/apache/incubator-rya/pull/296
Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/762/
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187631961
--- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+ @Override
+ protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException {
+ requireNonNull(taskConfig);
+
+ // Parse the configuration object.
+ final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
+
+ // Connect to the instance of Accumulo.
+ final Connector connector;
+ try {
+ final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+ connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() ));
+ } catch (AccumuloException | AccumuloSecurityException e) {
+ throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e);
+ }
+
+ // Use a RyaClient to see if the configured instance exists.
+ try {
+ final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
+ config.getUsername(),
+ config.getPassword().toCharArray(),
+ config.getClusterName(),
+ config.getZookeepers());
+ final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector);
+
+ if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
+ throw new ConnectException("The Rya Instance named " +
+ config.getRyaInstanceName() + " has not been installed.");
+ }
+
+ } catch (final RyaClientException e) {
+ throw new ConnectException("Unable to determine if the Rya Instance named " +
+ config.getRyaInstanceName() + " has been installed.", e);
+ }
+ }
+
+ @Override
+ protected Sail makeSail(final Map<String, String> taskConfig) throws ConnectException {
+ requireNonNull(taskConfig);
+
+ // Parse the configuration object.
+ final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
+
+ // Move the configuration into a Rya Configuration object.
+ final AccumuloRdfConfiguration ryaConfig = new AccumuloRdfConfiguration();
+ ryaConfig.setTablePrefix( config.getRyaInstanceName() );
+ ryaConfig.setAccumuloZookeepers( config.getZookeepers() );
+ ryaConfig.setAccumuloInstance( config.getClusterName() );
+ ryaConfig.setAccumuloUser( config.getUsername() );
+ ryaConfig.setAccumuloPassword( config.getPassword() );
+
+ // Create the Sail object.
+ try {
+ return RyaSailFactory.getInstance(ryaConfig);
--- End diff --
This should support geo as well:
```
if (OptionalConfigUtils.getUseGeo(ryaConfig)) {
return GeoRyaSailFactory.getInstance(ryaConfig);
} else {
return RyaSailFactory.getInstance(ryaConfig);
}
```
We really need a better mechanism than this. Maybe someday we'll make plugins for different Sails.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187739297
--- Diff: extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java ---
@@ -438,6 +438,9 @@ public static boolean getUsePcjUpdaterIndex(final Configuration conf) {
return Optional.fromNullable(conf.get(FLUO_APP_NAME));
}
+ public static void setUseMongo(final Configuration conf, final boolean useMongo) {
--- End diff --
sure makes sense
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187728842
--- Diff: extras/kafka.connect/accumulo-it/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTaskIT.java ---
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.test.accumulo.AccumuloITBase;
+import org.junit.Test;
+
+/**
+ * Integration tests the methods of {@link AccumuloRyaSinkTask}.
--- End diff --
Done.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187699543
--- Diff: extras/kafka.connect/README.md ---
@@ -0,0 +1,22 @@
+<!-- Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. -->
+
+The parent project for all Rya Kafka Connect work. All projects thare are part
+of that system must use this project's pom as their parent pom.
--- End diff --
typo All projects that* are....
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187632758
--- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsDeserializer.java ---
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFHandler;
+import org.eclipse.rdf4j.rio.RDFHandlerException;
+import org.eclipse.rdf4j.rio.RDFParseException;
+import org.eclipse.rdf4j.rio.RDFParser;
+import org.eclipse.rdf4j.rio.binary.BinaryRDFParserFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka {@link Deserializer} that is able to deserialize an RDF4J Rio Binary format serialized
+ * set of {@link Statement}s.
+ */
+@DefaultAnnotation(NonNull.class)
+public class StatementsDeserializer implements Deserializer<Set<Statement>> {
+ private static final Logger log = LoggerFactory.getLogger(StatementsDeserializer.class);
+
+ private static final BinaryRDFParserFactory PARSER_FACTORY = new BinaryRDFParserFactory();
+
+ @Override
+ public void configure(final Map<String, ?> configs, final boolean isKey) {
+ // Nothing to do.
+ }
+
+ @Override
+ public Set<Statement> deserialize(final String topic, final byte[] data) {
+ if(data == null || data.length == 0) {
+ // Return null because that is the contract of this method.
+ return null;
+ }
+
+ try {
+ final RDFParser parser = PARSER_FACTORY.getParser();
+ final Set<Statement> statements = new HashSet<>();
+
+ parser.setRDFHandler(new RDFHandler() {
+ @Override
+ public void handleStatement(final Statement statement) throws RDFHandlerException {
+ log.debug("Statement: " + statement);
+ statements.add( statement );
+ }
+
+ @Override public void startRDF() throws RDFHandlerException { }
+ @Override public void handleNamespace(final String arg0, final String arg1) throws RDFHandlerException { }
+ @Override public void handleComment(final String arg0) throws RDFHandlerException { }
+ @Override public void endRDF() throws RDFHandlerException { }
+ });
+
+ parser.parse(new ByteArrayInputStream(data), null);
+ return statements;
+
+ } catch(final RDFParseException | RDFHandlerException | IOException e) {
+ log.error("Could not deserialize a Set of VisibilityStatement objects using the RDF4J Rio Binray format.", e);
--- End diff --
typo. Binary
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187657023
--- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java ---
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.mongo.MongoConnectionDetails;
+import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import com.google.common.base.Strings;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoCredential;
+import com.mongodb.ServerAddress;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoRyaSinkTask extends RyaSinkTask {
+
+ @Override
+ protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws IllegalStateException {
+ requireNonNull(taskConfig);
+
+ // Parse the configuration object.
+ final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig);
+ @Nullable
+ final String username = Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername();
+ @Nullable
+ final char[] password = Strings.isNullOrEmpty(config.getPassword()) ? null : config.getPassword().toCharArray();
+
+ // Connect a Mongo Client to the configured Mongo DB instance.
+ final ServerAddress serverAddr = new ServerAddress(config.getHostname(), config.getPort());
+ final boolean hasCredentials = username != null && password != null;
+
+ try(MongoClient mongoClient = hasCredentials ?
+ new MongoClient(serverAddr, Arrays.asList(MongoCredential.createCredential(username, config.getRyaInstanceName(), password))) :
+ new MongoClient(serverAddr)) {
+ // Use a RyaClient to see if the configured instance exists.
+ // Create the Mongo Connection Details that describe the Mongo DB Server we are interacting with.
+ final MongoConnectionDetails connectionDetails = new MongoConnectionDetails(
+ config.getHostname(),
+ config.getPort(),
+ Optional.ofNullable(username),
+ Optional.ofNullable(password));
+
+ final RyaClient client = MongoRyaClientFactory.build(connectionDetails, mongoClient);
+ if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
+ throw new ConnectException("The Rya Instance named " +
+ config.getRyaInstanceName() + " has not been installed.");
+ }
+ } catch(final RyaClientException e) {
+ throw new ConnectException("Unable to determine if the Rya Instance named " +
+ config.getRyaInstanceName() + " has been installed.", e);
+ }
+ }
+
+ @Override
+ protected Sail makeSail(final Map<String, String> taskConfig) {
+ requireNonNull(taskConfig);
+
+ // Parse the configuration object.
+ final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig);
+
+ // Move the configuration into a Rya Configuration object.
+ final MongoDBRdfConfiguration ryaConfig = new MongoDBRdfConfiguration();
+ ConfigUtils.setUseMongo(ryaConfig, true);
+ ryaConfig.setMongoDBName( config.getRyaInstanceName() );
+ ryaConfig.setTablePrefix( config.getRyaInstanceName() );
+ ryaConfig.setMongoHostname( config.getHostname() );
+ ryaConfig.setMongoPort( "" + config.getPort() );
+
+ if(!Strings.isNullOrEmpty(config.getUsername()) && !Strings.isNullOrEmpty(config.getPassword())) {
+ ryaConfig.setMongoUser( config.getUsername() );
+ ryaConfig.setMongoPassword( config.getPassword() );
+ }
+
+ // Create the Sail object.
+ try {
+ return RyaSailFactory.getInstance(ryaConfig);
--- End diff --
Could you be more specific? Do you mean add a geo profile that bundles the geo jars into this project?
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187736164
--- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfig.java ---
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka Connect configuration that is used to configure {@link MongoRyaSinkConnector}s and {@link MongoRyaSinkTask}s.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoRyaSinkConfig extends RyaSinkConfig {
+
+ public static final String HOSTNAME = "mongo.hostname";
+ private static final String HOSTNAME_DOC = "The Mongo DB hostname the Sail connections wlll use.";
--- End diff --
Done.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187667486
--- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java ---
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.mongo.MongoConnectionDetails;
+import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import com.google.common.base.Strings;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoCredential;
+import com.mongodb.ServerAddress;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoRyaSinkTask extends RyaSinkTask {
+
+ @Override
+ protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws IllegalStateException {
+ requireNonNull(taskConfig);
+
+ // Parse the configuration object.
+ final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig);
+ @Nullable
+ final String username = Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername();
+ @Nullable
+ final char[] password = Strings.isNullOrEmpty(config.getPassword()) ? null : config.getPassword().toCharArray();
+
+ // Connect a Mongo Client to the configured Mongo DB instance.
+ final ServerAddress serverAddr = new ServerAddress(config.getHostname(), config.getPort());
+ final boolean hasCredentials = username != null && password != null;
+
+ try(MongoClient mongoClient = hasCredentials ?
+ new MongoClient(serverAddr, Arrays.asList(MongoCredential.createCredential(username, config.getRyaInstanceName(), password))) :
+ new MongoClient(serverAddr)) {
+ // Use a RyaClient to see if the configured instance exists.
+ // Create the Mongo Connection Details that describe the Mongo DB Server we are interacting with.
+ final MongoConnectionDetails connectionDetails = new MongoConnectionDetails(
+ config.getHostname(),
+ config.getPort(),
+ Optional.ofNullable(username),
+ Optional.ofNullable(password));
+
+ final RyaClient client = MongoRyaClientFactory.build(connectionDetails, mongoClient);
+ if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
+ throw new ConnectException("The Rya Instance named " +
+ config.getRyaInstanceName() + " has not been installed.");
+ }
+ } catch(final RyaClientException e) {
+ throw new ConnectException("Unable to determine if the Rya Instance named " +
+ config.getRyaInstanceName() + " has been installed.", e);
+ }
+ }
+
+ @Override
+ protected Sail makeSail(final Map<String, String> taskConfig) {
+ requireNonNull(taskConfig);
+
+ // Parse the configuration object.
+ final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig);
+
+ // Move the configuration into a Rya Configuration object.
+ final MongoDBRdfConfiguration ryaConfig = new MongoDBRdfConfiguration();
+ ConfigUtils.setUseMongo(ryaConfig, true);
+ ryaConfig.setMongoDBName( config.getRyaInstanceName() );
+ ryaConfig.setTablePrefix( config.getRyaInstanceName() );
+ ryaConfig.setMongoHostname( config.getHostname() );
+ ryaConfig.setMongoPort( "" + config.getPort() );
+
+ if(!Strings.isNullOrEmpty(config.getUsername()) && !Strings.isNullOrEmpty(config.getPassword())) {
+ ryaConfig.setMongoUser( config.getUsername() );
+ ryaConfig.setMongoPassword( config.getPassword() );
+ }
+
+ // Create the Sail object.
+ try {
+ return RyaSailFactory.getInstance(ryaConfig);
--- End diff --
We can ignore geo here for now. See comment up above.
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187628752
--- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+ @Override
+ protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException {
+ requireNonNull(taskConfig);
+
+ // Parse the configuration object.
+ final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
+
+ // Connect to the instance of Accumulo.
+ final Connector connector;
+ try {
+ final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+ connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() ));
+ } catch (AccumuloException | AccumuloSecurityException e) {
--- End diff --
final
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187739254
--- Diff: dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java ---
@@ -62,14 +62,14 @@ public AccumuloRdfConfiguration() {
super();
}
- public AccumuloRdfConfiguration(Configuration other) {
+ public AccumuloRdfConfiguration(final Configuration other) {
super(other);
}
- public AccumuloRdfConfigurationBuilder getBuilder() {
+ public static AccumuloRdfConfigurationBuilder getBuilder() {
--- End diff --
👍
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187638302
--- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java ---
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.mongo.MongoConnectionDetails;
+import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import com.google.common.base.Strings;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoCredential;
+import com.mongodb.ServerAddress;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoRyaSinkTask extends RyaSinkTask {
+
+ @Override
+ protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws IllegalStateException {
+ requireNonNull(taskConfig);
+
+ // Parse the configuration object.
+ final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig);
+ @Nullable
+ final String username = Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername();
+ @Nullable
+ final char[] password = Strings.isNullOrEmpty(config.getPassword()) ? null : config.getPassword().toCharArray();
+
+ // Connect a Mongo Client to the configured Mongo DB instance.
+ final ServerAddress serverAddr = new ServerAddress(config.getHostname(), config.getPort());
+ final boolean hasCredentials = username != null && password != null;
+
+ try(MongoClient mongoClient = hasCredentials ?
+ new MongoClient(serverAddr, Arrays.asList(MongoCredential.createCredential(username, config.getRyaInstanceName(), password))) :
+ new MongoClient(serverAddr)) {
+ // Use a RyaClient to see if the configured instance exists.
+ // Create the Mongo Connection Details that describe the Mongo DB Server we are interacting with.
+ final MongoConnectionDetails connectionDetails = new MongoConnectionDetails(
+ config.getHostname(),
+ config.getPort(),
+ Optional.ofNullable(username),
+ Optional.ofNullable(password));
+
+ final RyaClient client = MongoRyaClientFactory.build(connectionDetails, mongoClient);
+ if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
+ throw new ConnectException("The Rya Instance named " +
+ config.getRyaInstanceName() + " has not been installed.");
--- End diff --
LogUtils.clean(config.getRyaInstanceName())
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187734448
--- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java ---
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFWriter;
+import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka {@link Serializer} that is able to serialize a set of {@link Statement}s
+ * using the RDF4J Rio Binary format.
--- End diff --
I think saying I'm using the RDF4J Rio Binary format is more useful than indicating how I went about doing that since that's what the code is.
---
[GitHub] incubator-rya issue #296: RYA-487 Kafka Connect Sinks
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:
https://github.com/apache/incubator-rya/pull/296
Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/757/<h2>Build result: FAILURE</span></h2>[...truncated 91.94 MB...][INFO] Apache Rya Web Projects ............................ SKIPPED[INFO] Apache Rya Web Implementation ...................... SKIPPED[INFO] ------------------------------------------------------------------------[INFO] BUILD FAILURE[INFO] ------------------------------------------------------------------------[INFO] Total time: 46:04 min[INFO] Finished at: 2018-05-10T18:10:41+00:00[INFO] Final Memory: 519M/2971M[INFO] ------------------------------------------------------------------------Waiting for Jenkins to finish collecting data[ERROR] Failed to execute goal org.apache.rat:apache-rat-plugin:0.11:check (check-licenses) on project rya.kafka.connect.parent: Too many files with unapproved license: 1 See RAT report in: /home/jenkins/jenkins-slave/workspace/incubator-rya-master-with-optionals-pull-requests/extras/kafka.connect/target/r
at.txt -> [Help 1][ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.[ERROR] Re-run Maven using the -X switch to enable full debug logging.[ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles:[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException[ERROR] [ERROR] After correcting the problems, you can resume the build with the command[ERROR] mvn <goals> -rf :rya.kafka.connect.parentchannel stoppedSetting status of c2d802bd51d4d6951aa54e48b4038d64462bd8fd to FAILURE with url https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/757/ and message: 'FAILURE 'Using context: Jenkins: clean package -Pgeoindexing
---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187727065
--- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+ @Override
+ protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException {
+ requireNonNull(taskConfig);
+
+ // Parse the configuration object.
+ final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
+
+ // Connect to the instance of Accumulo.
+ final Connector connector;
+ try {
+ final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+ connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() ));
+ } catch (AccumuloException | AccumuloSecurityException e) {
+ throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e);
+ }
+
+ // Use a RyaClient to see if the configured instance exists.
+ try {
+ final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
+ config.getUsername(),
+ config.getPassword().toCharArray(),
+ config.getClusterName(),
+ config.getZookeepers());
+ final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector);
+
+ if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
+ throw new ConnectException("The Rya Instance named " +
+ config.getRyaInstanceName() + " has not been installed.");
+ }
+
+ } catch (final RyaClientException e) {
+ throw new ConnectException("Unable to determine if the Rya Instance named " +
+ config.getRyaInstanceName() + " has been installed.", e);
+ }
+ }
+
+ @Override
+ protected Sail makeSail(final Map<String, String> taskConfig) throws ConnectException {
+ requireNonNull(taskConfig);
+
+ // Parse the configuration object.
+ final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
+
+ // Move the configuration into a Rya Configuration object.
+ final AccumuloRdfConfiguration ryaConfig = new AccumuloRdfConfiguration();
+ ryaConfig.setTablePrefix( config.getRyaInstanceName() );
+ ryaConfig.setAccumuloZookeepers( config.getZookeepers() );
+ ryaConfig.setAccumuloInstance( config.getClusterName() );
+ ryaConfig.setAccumuloUser( config.getUsername() );
+ ryaConfig.setAccumuloPassword( config.getPassword() );
+
+ // Create the Sail object.
+ try {
+ return RyaSailFactory.getInstance(ryaConfig);
--- End diff --
I'll add that to the future work section of the manual.
---