You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@rya.apache.org by kchilton2 <gi...@git.apache.org> on 2018/05/10 16:59:08 UTC

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

GitHub user kchilton2 opened a pull request:

    https://github.com/apache/incubator-rya/pull/296

    RYA-487 Kafka Connect Sinks

    Ignore RYA-487. That's a separate PR that this depended on to create documentation for the manual.
    
    This PR implements Kafka Connect Sinks for both Accumulo and MongoDB backed Rya. I wrote a manual page that explains design decisions that were made, notes about deploying the sinks, a quick start, and future work we may want to consider.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kchilton2/incubator-rya RYA-487

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-rya/pull/296.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #296
    
----
commit ed1def04283a2e893a1c45014628dc7fe10717eb
Author: kchilton2 <ke...@...>
Date:   2018-05-08T22:33:24Z

    RYA-494 Fixed a bug where the shell was not loading or displaying all Statements.

commit c2d802bd51d4d6951aa54e48b4038d64462bd8fd
Author: kchilton2 <ke...@...>
Date:   2018-04-17T19:10:26Z

    RYA-487 Implement Kafka Connect Sink implementations for Accumulo and Mongo DB backed Rya.

----


---

[GitHub] incubator-rya issue #296: RYA-487 Kafka Connect Sinks

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/incubator-rya/pull/296
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/758/



---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187702698
  
    --- Diff: extras/kafka.connect/accumulo/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfigTest.java ---
    @@ -0,0 +1,42 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.accumulo;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig;
    +import org.junit.Test;
    +
    +/**
    + * Unit tests the methods of {@link AccumuloRyaSinkConfig}.
    + */
    +public class AccumuloRyaSinkConfigTest {
    +
    +    @Test
    +    public void parses() {
    --- End diff --
    
    should there be other tests here confirming when malformed fields are added?


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187708536
  
    --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java ---
    @@ -0,0 +1,144 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.api.sink;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Collection;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.kafka.connect.errors.ConnectException;
    +import org.apache.kafka.connect.sink.SinkRecord;
    +import org.apache.kafka.connect.sink.SinkTask;
    +import org.eclipse.rdf4j.model.Statement;
    +import org.eclipse.rdf4j.repository.sail.SailRepository;
    +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
    +import org.eclipse.rdf4j.sail.Sail;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.jcabi.manifests.Manifests;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +import edu.umd.cs.findbugs.annotations.Nullable;
    +
    +/**
    + * Handles the common components required to write {@link Statement}s to Rya.
    + * <p/>
    + * Implementations of this class only need to specify functionality that is specific to the
    + * Rya implementation.
    + */
    +@DefaultAnnotation(NonNull.class)
    --- End diff --
    
    maybe I'm not clear what this annotation does, but again, all the fields are declared Nullable


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187730865
  
    --- Diff: extras/kafka.connect/client/pom.xml ---
    @@ -0,0 +1,135 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one
    +  or more contributor license agreements.  See the NOTICE file
    +  distributed with this work for additional information
    +  regarding copyright ownership.  The ASF licenses this file
    +  to you under the Apache License, Version 2.0 (the
    +  "License"); you may not use this file except in compliance
    +  with the License.  You may obtain a copy of the License at
    +
    +    http://www.apache.org/licenses/LICENSE-2.0
    +
    +  Unless required by applicable law or agreed to in writing,
    +  software distributed under the License is distributed on an
    +  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +  KIND, either express or implied.  See the License for the
    +  specific language governing permissions and limitations
    +  under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" 
    +         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    +         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <modelVersion>4.0.0</modelVersion>
    +
    +    <parent>
    +        <groupId>org.apache.rya</groupId>
    +        <artifactId>rya.kafka.connect.parent</artifactId>
    +        <version>4.0.0-incubating-SNAPSHOT</version>
    +    </parent>
    +
    +    <artifactId>rya.kafka.connect.client</artifactId>
    +
    +    <name>Apache Rya Kafka Connect - Client</name>
    +    <description>Contains a client that may be used to load Statements into 
    +                 a Kafka topic to be read by Kafka Connect.</description>
    +
    +    <dependencies>
    +        <!-- 1st party dependencies. -->
    +        <dependency>
    +            <groupId>org.apache.rya</groupId>
    +            <artifactId>rya.sail</artifactId>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.rya</groupId>
    +            <artifactId>rya.kafka.connect.api</artifactId>
    +        </dependency>
    +    
    +        <!-- 3rd party dependencies. -->
    +        <dependency>
    +            <groupId>org.eclipse.rdf4j</groupId>
    +            <artifactId>rdf4j-model</artifactId>
    +        </dependency>
    +        <dependency>
    +            <groupId>com.google.guava</groupId>
    +            <artifactId>guava</artifactId>
    +        </dependency>
    +        <dependency>
    +            <groupId>com.beust</groupId>
    +            <artifactId>jcommander</artifactId>
    +        </dependency>
    +        <dependency>
    +            <groupId>com.github.stephenc.findbugs</groupId>
    +            <artifactId>findbugs-annotations</artifactId>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.kafka</groupId>
    +            <artifactId>kafka-clients</artifactId>
    +        </dependency>
    +        
    +        <!-- Statement formats we support for loading. -->
    --- End diff --
    
    It does, good call. Done.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187636622
  
    --- Diff: extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/CLIDriver.java ---
    @@ -0,0 +1,121 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.client;
    +
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand.ArgumentsException;
    +import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand.ExecutionException;
    +import org.apache.rya.kafka.connect.client.command.ReadStatementsCommand;
    +import org.apache.rya.kafka.connect.client.command.WriteStatementsCommand;
    +import org.eclipse.rdf4j.model.Statement;
    +
    +import com.google.common.collect.ImmutableMap;
    +import com.google.common.collect.Lists;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +
    +/**
    + * A CLI tool used to read/write {@link Statement}s to/from a Kafka topic using the format
    + * the Rya Kafka Connect Sinks expect.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class CLIDriver {
    +
    +    /**
    +     * Maps from command strings to the object that performs the command.
    +     */
    +    private static final ImmutableMap<String, RyaKafkaClientCommand> COMMANDS;
    +    static {
    +        final Set<Class<? extends RyaKafkaClientCommand>> commandClasses = new HashSet<>();
    +        commandClasses.add(ReadStatementsCommand.class);
    +        commandClasses.add(WriteStatementsCommand.class);
    +        final ImmutableMap.Builder<String, RyaKafkaClientCommand> builder = ImmutableMap.builder();
    +        for(final Class<? extends RyaKafkaClientCommand> commandClass : commandClasses) {
    +            try {
    +                final RyaKafkaClientCommand command = commandClass.newInstance();
    +                builder.put(command.getCommand(), command);
    +            } catch (InstantiationException | IllegalAccessException e) {
    +                System.err.println("Could not run the application because a RyaKafkaClientCommand is missing its empty constructor.");
    +                e.printStackTrace();
    +            }
    +        }
    +        COMMANDS = builder.build();
    +    }
    +
    +    private static final String USAGE = makeUsage(COMMANDS);
    +
    +    public static void main(final String[] args) {
    +        // If no command provided or the command isn't recognized, then print the usage.
    +        if (args.length == 0 || !COMMANDS.containsKey(args[0])) {
    +            System.out.println(USAGE);
    +            System.exit(1);
    +        }
    +
    +        // Fetch the command that will be executed.
    +        final String command = args[0];
    +        final String[] commandArgs = Arrays.copyOfRange(args, 1, args.length);
    +        final RyaKafkaClientCommand clientCommand = COMMANDS.get(command);
    +
    +        // Print usage if the arguments are invalid for the command.
    +        if(!clientCommand.validArguments(commandArgs)) {
    +            System.out.println(clientCommand.getUsage());
    +            System.exit(1);
    +        }
    +
    +        // Execute the command.
    +            try {
    --- End diff --
    
    Spacing is off


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187736288
  
    --- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConnector.java ---
    @@ -0,0 +1,63 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.mongo;
    +
    +import java.util.Map;
    +
    +import org.apache.kafka.common.config.AbstractConfig;
    +import org.apache.kafka.common.config.ConfigDef;
    +import org.apache.kafka.connect.connector.Task;
    +import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +import edu.umd.cs.findbugs.annotations.Nullable;
    +
    +/**
    + * A {@link RyaSinkConnector} that uses a Mongo DB Rya backend when creating tasks.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class MongoRyaSinkConnector extends RyaSinkConnector {
    +
    +    @Nullable
    +    private MongoRyaSinkConfig config = null;
    +
    +    @Override
    +    public void start(final Map<String, String> props) {
    +        this.config = new MongoRyaSinkConfig( props );
    +    }
    +
    +    @Override
    +    protected AbstractConfig getConfig() {
    +        if(config == null) {
    +            throw new IllegalStateException("The configuration has not been set yet. Invoke start(props) first.");
    --- End diff --
    
    Done.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187733216
  
    --- Diff: extras/kafka.connect/README.md ---
    @@ -0,0 +1,22 @@
    +<!-- Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License. -->
    +
    +The parent project for all Rya Kafka Connect work. All projects thare are part 
    +of that system must use this project's pom as their parent pom.
    --- End diff --
    
    Done.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-rya/pull/296


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187739885
  
    --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java ---
    @@ -0,0 +1,144 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.api.sink;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Collection;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.kafka.connect.errors.ConnectException;
    +import org.apache.kafka.connect.sink.SinkRecord;
    +import org.apache.kafka.connect.sink.SinkTask;
    +import org.eclipse.rdf4j.model.Statement;
    +import org.eclipse.rdf4j.repository.sail.SailRepository;
    +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
    +import org.eclipse.rdf4j.sail.Sail;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.jcabi.manifests.Manifests;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +import edu.umd.cs.findbugs.annotations.Nullable;
    +
    +/**
    + * Handles the common components required to write {@link Statement}s to Rya.
    + * <p/>
    + * Implementations of this class only need to specify functionality that is specific to the
    + * Rya implementation.
    + */
    +@DefaultAnnotation(NonNull.class)
    --- End diff --
    
    gotcha, thanks for the doc paste dump


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187698179
  
    --- Diff: dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java ---
    @@ -62,14 +62,14 @@ public AccumuloRdfConfiguration() {
             super();
         }
     
    -    public AccumuloRdfConfiguration(Configuration other) {
    +    public AccumuloRdfConfiguration(final Configuration other) {
             super(other);
         }
     
    -    public AccumuloRdfConfigurationBuilder getBuilder() {
    +    public static AccumuloRdfConfigurationBuilder getBuilder() {
    --- End diff --
    
    isn't the convention usually builder() for static builder functions?


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187634162
  
    --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConnector.java ---
    @@ -0,0 +1,69 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.api.sink;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.kafka.common.config.AbstractConfig;
    +import org.apache.kafka.connect.sink.SinkConnector;
    +
    +import com.jcabi.manifests.Manifests;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +
    +/**
    + * Handles the common components required to task {@link RyaSinkTask}s that write to Rya.
    + * </p>
    + * Implementations of this class only need to specify functionality that is specific to the Rya implementation.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public abstract class RyaSinkConnector extends SinkConnector {
    +
    +    /**
    +     * Get the configuration that will be provided to the tasks when {@link #taskConfigs(int)} is invoked.
    +     * </p>
    +     * Only called after start has been invoked
    +     *
    +     * @return The configuration object for the connector.
    +     * @throws IllegalStateException Thrown if {@link SinkConnector#start(Map)} has not been invoked yet.
    +     */
    +    protected abstract AbstractConfig getConfig() throws IllegalStateException;
    +
    +    @Override
    +    public String version() {
    +        return Manifests.exists("Build-Version") ? Manifests.read("Build-Version") : "UNKNOWN";
    +    }
    +
    +    @Override
    +    public List<Map<String, String>> taskConfigs(final int maxTasks) {
    +        final List<Map<String, String>> configs = new ArrayList<>(maxTasks);
    +        for(int i = 0; i < maxTasks; i++) {
    +            configs.add( getConfig().originalsStrings() );
    +        }
    +        return configs;
    +    }
    +
    +    @Override
    +    public void stop() {
    +        // Nothing to do since the RyaSinkconnector has no background monitoring.
    --- End diff --
    
    typo. RyaSinkConnector


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187702555
  
    --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java ---
    @@ -0,0 +1,111 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.accumulo;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Map;
    +
    +import org.apache.accumulo.core.client.AccumuloException;
    +import org.apache.accumulo.core.client.AccumuloSecurityException;
    +import org.apache.accumulo.core.client.Connector;
    +import org.apache.accumulo.core.client.Instance;
    +import org.apache.accumulo.core.client.ZooKeeperInstance;
    +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
    +import org.apache.kafka.connect.errors.ConnectException;
    +import org.apache.rya.accumulo.AccumuloRdfConfiguration;
    +import org.apache.rya.api.client.RyaClient;
    +import org.apache.rya.api.client.RyaClientException;
    +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
    +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
    +import org.apache.rya.api.persist.RyaDAOException;
    +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
    +import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
    +import org.apache.rya.sail.config.RyaSailFactory;
    +import org.eclipse.rdf4j.sail.Sail;
    +import org.eclipse.rdf4j.sail.SailException;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +
    +/**
    + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class AccumuloRyaSinkTask extends RyaSinkTask {
    +
    +    @Override
    +    protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException {
    +        requireNonNull(taskConfig);
    +
    +        // Parse the configuration object.
    +        final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
    +
    +        // Connect to the instance of Accumulo.
    +        final Connector connector;
    +        try {
    +            final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
    +            connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() ));
    +        } catch (AccumuloException | AccumuloSecurityException e) {
    +            throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e);
    +        }
    +
    +        // Use a RyaClient to see if the configured instance exists.
    +        try {
    +            final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
    +                    config.getUsername(),
    +                    config.getPassword().toCharArray(),
    +                    config.getClusterName(),
    +                    config.getZookeepers());
    +            final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector);
    +
    +            if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
    +                throw new ConnectException("The Rya Instance named " +
    +                        config.getRyaInstanceName() + " has not been installed.");
    +            }
    +
    +        } catch (final RyaClientException e) {
    +            throw new ConnectException("Unable to determine if the Rya Instance named " +
    +                    config.getRyaInstanceName() + " has been installed.", e);
    +        }
    +    }
    +
    +    @Override
    +    protected Sail makeSail(final Map<String, String> taskConfig) throws ConnectException {
    +        requireNonNull(taskConfig);
    +
    +        // Parse the configuration object.
    +        final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
    +
    +        // Move the configuration into a Rya Configuration object.
    +        final AccumuloRdfConfiguration ryaConfig = new AccumuloRdfConfiguration();
    +        ryaConfig.setTablePrefix( config.getRyaInstanceName() );
    +        ryaConfig.setAccumuloZookeepers( config.getZookeepers() );
    +        ryaConfig.setAccumuloInstance( config.getClusterName() );
    +        ryaConfig.setAccumuloUser( config.getUsername() );
    +        ryaConfig.setAccumuloPassword( config.getPassword() );
    +
    +        // Create the Sail object.
    +        try {
    +            return RyaSailFactory.getInstance(ryaConfig);
    --- End diff --
    
    yeah, when we started the Rya Streams project Geo was considered a non-requirement.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187629343
  
    --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java ---
    @@ -0,0 +1,111 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.accumulo;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Map;
    +
    +import org.apache.accumulo.core.client.AccumuloException;
    +import org.apache.accumulo.core.client.AccumuloSecurityException;
    +import org.apache.accumulo.core.client.Connector;
    +import org.apache.accumulo.core.client.Instance;
    +import org.apache.accumulo.core.client.ZooKeeperInstance;
    +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
    +import org.apache.kafka.connect.errors.ConnectException;
    +import org.apache.rya.accumulo.AccumuloRdfConfiguration;
    +import org.apache.rya.api.client.RyaClient;
    +import org.apache.rya.api.client.RyaClientException;
    +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
    +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
    +import org.apache.rya.api.persist.RyaDAOException;
    +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
    +import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
    +import org.apache.rya.sail.config.RyaSailFactory;
    +import org.eclipse.rdf4j.sail.Sail;
    +import org.eclipse.rdf4j.sail.SailException;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +
    +/**
    + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class AccumuloRyaSinkTask extends RyaSinkTask {
    +
    +    @Override
    +    protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException {
    +        requireNonNull(taskConfig);
    +
    +        // Parse the configuration object.
    +        final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
    +
    +        // Connect to the instance of Accumulo.
    +        final Connector connector;
    +        try {
    +            final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
    +            connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() ));
    +        } catch (AccumuloException | AccumuloSecurityException e) {
    +            throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e);
    +        }
    +
    +        // Use a RyaClient to see if the configured instance exists.
    +        try {
    +            final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
    +                    config.getUsername(),
    +                    config.getPassword().toCharArray(),
    +                    config.getClusterName(),
    +                    config.getZookeepers());
    +            final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector);
    +
    +            if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
    +                throw new ConnectException("The Rya Instance named " +
    +                        config.getRyaInstanceName() + " has not been installed.");
    --- End diff --
    
    Use LogUtils.
    LogUtils.clean(config.getRyaInstanceName())


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187633086
  
    --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java ---
    @@ -0,0 +1,77 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.api;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.kafka.common.serialization.Serializer;
    +import org.eclipse.rdf4j.model.Statement;
    +import org.eclipse.rdf4j.rio.RDFWriter;
    +import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +
    +/**
    + * A Kafka {@link Serializer} that is able to serialize a set of {@link Statement}s
    + * using the RDF4J Rio Binary format.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class StatementsSerializer implements Serializer<Set<Statement>> {
    +    private static final Logger log = LoggerFactory.getLogger(StatementsSerializer.class);
    +
    +    private static final BinaryRDFWriterFactory WRITER_FACTORY = new BinaryRDFWriterFactory();
    +
    +    @Override
    +    public void configure(final Map<String, ?> configs, final boolean isKey) {
    +        // Nothing to do.
    +    }
    +
    +    @Override
    +    public byte[] serialize(final String topic, final Set<Statement> data) {
    +        if(data == null) {
    +            // Returning null because that is the contract of this method.
    +            return null;
    +        }
    +
    +        // Write the statements using a Binary RDF Writer.
    +        final ByteArrayOutputStream boas = new ByteArrayOutputStream();
    --- End diff --
    
    change variable name to baos


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187709021
  
    --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java ---
    @@ -0,0 +1,144 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.api.sink;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Collection;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.kafka.connect.errors.ConnectException;
    +import org.apache.kafka.connect.sink.SinkRecord;
    +import org.apache.kafka.connect.sink.SinkTask;
    +import org.eclipse.rdf4j.model.Statement;
    +import org.eclipse.rdf4j.repository.sail.SailRepository;
    +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
    +import org.eclipse.rdf4j.sail.Sail;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.jcabi.manifests.Manifests;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +import edu.umd.cs.findbugs.annotations.Nullable;
    +
    +/**
    + * Handles the common components required to write {@link Statement}s to Rya.
    + * <p/>
    + * Implementations of this class only need to specify functionality that is specific to the
    + * Rya implementation.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public abstract class RyaSinkTask extends SinkTask {
    +    private static final Logger log = LoggerFactory.getLogger(RyaSinkTask.class);
    +
    +    @Nullable
    +    private SailRepository sailRepo = null;
    +
    +    @Nullable
    +    private SailRepositoryConnection conn = null;
    +
    +    /**
    +     * Throws an exception if the configured Rya Instance is not already installed
    +     * within the configured database.
    +     *
    +     * @param taskConfig - The configuration values that were provided to the task. (not null)
    +     * @throws ConnectException The configured Rya Instance is not installed to the configured database
    +     *   or we were unable to figure out if it is installed.
    +     */
    +    protected abstract void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException;
    +
    +    /**
    +     * Creates an initialized {@link Sail} object that may be used to write {@link Statement}s to the configured
    +     * Rya Instance.
    +     *
    +     * @param taskConfig - Configures how the Sail object will be created. (not null)
    +     * @return The created Sail object.
    +     * @throws ConnectException The Sail object could not be made.
    +     */
    +    protected abstract Sail makeSail(final Map<String, String> taskConfig) throws ConnectException;
    +
    +    @Override
    +    public String version() {
    +        return Manifests.exists("Build-Version") ? Manifests.read("Build-Version"): "UNKNOWN";
    +    }
    +
    +    @Override
    +    public void start(final Map<String, String> props) throws ConnectException {
    +        requireNonNull(props);
    +
    +        // Ensure the configured Rya Instance is installed within the configured database.
    +        checkRyaInstanceExists(props);
    +
    +        // Create the Sail object that is connected to the Rya Instance.
    +        final Sail sail = makeSail(props);
    +        sailRepo = new SailRepository( sail );
    +        conn = sailRepo.getConnection();
    +    }
    +
    +    @Override
    +    public void put(final Collection<SinkRecord> records) {
    +        requireNonNull(records);
    +
    +        // Return immediately if there are no records to handle.
    +        if(records.isEmpty()) {
    +            return;
    +        }
    +
    +        // If a transaction has not been started yet, then start one.
    +        if(!conn.isActive()) {
    +            conn.begin();
    +        }
    +
    +        // Iterate through the records and write them to the Sail object.
    +        for(final SinkRecord record : records) {
    +            // If everything has been configured correctly, then the record's value will be a Set<Statement>.
    +            conn.add((Set<? extends Statement>) record.value());
    +        }
    +    }
    +
    +    @Override
    +    public void flush(final Map<TopicPartition, OffsetAndMetadata> currentOffets) {
    +        // Flush the current transaction.
    +        conn.commit();
    +    }
    +
    +    @Override
    +    public void stop() {
    +        try {
    --- End diff --
    
    what about the actual Sail that is used?  are you expecting that to be shutdown by whatever uses this?


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187734692
  
    --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java ---
    @@ -0,0 +1,63 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.accumulo;
    +
    +import java.util.Map;
    +
    +import org.apache.kafka.common.config.AbstractConfig;
    +import org.apache.kafka.common.config.ConfigDef;
    +import org.apache.kafka.connect.connector.Task;
    +import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +import edu.umd.cs.findbugs.annotations.Nullable;
    +
    +/**
    + * A {@link RyaSinkConnector} that uses an Accumulo Rya backend when creating tasks.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class AccumuloRyaSinkConnector extends RyaSinkConnector {
    +
    +    @Nullable
    +    private AccumuloRyaSinkConfig config = null;
    +
    +    @Override
    +    public void start(final Map<String, String> props) {
    +        this.config = new AccumuloRyaSinkConfig( props );
    +    }
    +
    +    @Override
    +    protected AbstractConfig getConfig() {
    +        if(config == null) {
    +            throw new IllegalStateException("The configuration has not been set yet. Invoke start(props) first.");
    --- End diff --
    
    Done.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187638405
  
    --- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java ---
    @@ -0,0 +1,122 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.mongo;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Arrays;
    +import java.util.Map;
    +import java.util.Optional;
    +
    +import org.apache.accumulo.core.client.AccumuloException;
    +import org.apache.accumulo.core.client.AccumuloSecurityException;
    +import org.apache.kafka.connect.errors.ConnectException;
    +import org.apache.rya.api.client.RyaClient;
    +import org.apache.rya.api.client.RyaClientException;
    +import org.apache.rya.api.client.mongo.MongoConnectionDetails;
    +import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
    +import org.apache.rya.api.persist.RyaDAOException;
    +import org.apache.rya.indexing.accumulo.ConfigUtils;
    +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
    +import org.apache.rya.mongodb.MongoDBRdfConfiguration;
    +import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
    +import org.apache.rya.sail.config.RyaSailFactory;
    +import org.eclipse.rdf4j.sail.Sail;
    +import org.eclipse.rdf4j.sail.SailException;
    +
    +import com.google.common.base.Strings;
    +import com.mongodb.MongoClient;
    +import com.mongodb.MongoCredential;
    +import com.mongodb.ServerAddress;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +import edu.umd.cs.findbugs.annotations.Nullable;
    +
    +/**
    + * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to store data.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class MongoRyaSinkTask extends RyaSinkTask {
    +
    +    @Override
    +    protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws IllegalStateException {
    +        requireNonNull(taskConfig);
    +
    +        // Parse the configuration object.
    +        final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig);
    +        @Nullable
    +        final String username = Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername();
    +        @Nullable
    +        final char[] password = Strings.isNullOrEmpty(config.getPassword()) ? null : config.getPassword().toCharArray();
    +
    +        // Connect a Mongo Client to the configured Mongo DB instance.
    +        final ServerAddress serverAddr = new ServerAddress(config.getHostname(), config.getPort());
    +        final boolean hasCredentials = username != null && password != null;
    +
    +        try(MongoClient mongoClient = hasCredentials ?
    +                new MongoClient(serverAddr, Arrays.asList(MongoCredential.createCredential(username, config.getRyaInstanceName(), password))) :
    +                new MongoClient(serverAddr)) {
    +            // Use a RyaClient to see if the configured instance exists.
    +            // Create the Mongo Connection Details that describe the Mongo DB Server we are interacting with.
    +            final MongoConnectionDetails connectionDetails = new MongoConnectionDetails(
    +                    config.getHostname(),
    +                    config.getPort(),
    +                    Optional.ofNullable(username),
    +                    Optional.ofNullable(password));
    +
    +            final RyaClient client = MongoRyaClientFactory.build(connectionDetails, mongoClient);
    +            if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
    +                throw new ConnectException("The Rya Instance named " +
    +                        config.getRyaInstanceName() + " has not been installed.");
    +            }
    +        } catch(final RyaClientException e) {
    +            throw new ConnectException("Unable to determine if the Rya Instance named " +
    +                    config.getRyaInstanceName() + " has been installed.", e);
    +        }
    +    }
    +
    +    @Override
    +    protected Sail makeSail(final Map<String, String> taskConfig) {
    +        requireNonNull(taskConfig);
    +
    +        // Parse the configuration object.
    +        final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig);
    +
    +        // Move the configuration into a Rya Configuration object.
    +        final MongoDBRdfConfiguration ryaConfig = new MongoDBRdfConfiguration();
    +        ConfigUtils.setUseMongo(ryaConfig, true);
    +        ryaConfig.setMongoDBName( config.getRyaInstanceName() );
    +        ryaConfig.setTablePrefix( config.getRyaInstanceName() );
    +        ryaConfig.setMongoHostname( config.getHostname() );
    +        ryaConfig.setMongoPort( "" + config.getPort() );
    +
    +        if(!Strings.isNullOrEmpty(config.getUsername()) && !Strings.isNullOrEmpty(config.getPassword())) {
    +            ryaConfig.setMongoUser( config.getUsername() );
    +            ryaConfig.setMongoPassword( config.getPassword() );
    +        }
    +
    +        // Create the Sail object.
    +        try {
    +            return RyaSailFactory.getInstance(ryaConfig);
    --- End diff --
    
    Add geo support


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187667235
  
    --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java ---
    @@ -0,0 +1,111 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.accumulo;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Map;
    +
    +import org.apache.accumulo.core.client.AccumuloException;
    +import org.apache.accumulo.core.client.AccumuloSecurityException;
    +import org.apache.accumulo.core.client.Connector;
    +import org.apache.accumulo.core.client.Instance;
    +import org.apache.accumulo.core.client.ZooKeeperInstance;
    +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
    +import org.apache.kafka.connect.errors.ConnectException;
    +import org.apache.rya.accumulo.AccumuloRdfConfiguration;
    +import org.apache.rya.api.client.RyaClient;
    +import org.apache.rya.api.client.RyaClientException;
    +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
    +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
    +import org.apache.rya.api.persist.RyaDAOException;
    +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
    +import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
    +import org.apache.rya.sail.config.RyaSailFactory;
    +import org.eclipse.rdf4j.sail.Sail;
    +import org.eclipse.rdf4j.sail.SailException;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +
    +/**
    + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class AccumuloRyaSinkTask extends RyaSinkTask {
    +
    +    @Override
    +    protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException {
    +        requireNonNull(taskConfig);
    +
    +        // Parse the configuration object.
    +        final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
    +
    +        // Connect to the instance of Accumulo.
    +        final Connector connector;
    +        try {
    +            final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
    +            connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() ));
    +        } catch (AccumuloException | AccumuloSecurityException e) {
    +            throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e);
    +        }
    +
    +        // Use a RyaClient to see if the configured instance exists.
    +        try {
    +            final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
    +                    config.getUsername(),
    +                    config.getPassword().toCharArray(),
    +                    config.getClusterName(),
    +                    config.getZookeepers());
    +            final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector);
    +
    +            if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
    +                throw new ConnectException("The Rya Instance named " +
    +                        config.getRyaInstanceName() + " has not been installed.");
    +            }
    +
    +        } catch (final RyaClientException e) {
    +            throw new ConnectException("Unable to determine if the Rya Instance named " +
    +                    config.getRyaInstanceName() + " has been installed.", e);
    +        }
    +    }
    +
    +    @Override
    +    protected Sail makeSail(final Map<String, String> taskConfig) throws ConnectException {
    +        requireNonNull(taskConfig);
    +
    +        // Parse the configuration object.
    +        final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
    +
    +        // Move the configuration into a Rya Configuration object.
    +        final AccumuloRdfConfiguration ryaConfig = new AccumuloRdfConfiguration();
    +        ryaConfig.setTablePrefix( config.getRyaInstanceName() );
    +        ryaConfig.setAccumuloZookeepers( config.getZookeepers() );
    +        ryaConfig.setAccumuloInstance( config.getClusterName() );
    +        ryaConfig.setAccumuloUser( config.getUsername() );
    +        ryaConfig.setAccumuloPassword( config.getPassword() );
    +
    +        // Create the Sail object.
    +        try {
    +            return RyaSailFactory.getInstance(ryaConfig);
    --- End diff --
    
    I guess adding geo here can be an improvement for someday when we use plugins and don't need to specifically reference geo code.  Leave as RyaSailFactory for now.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187706947
  
    --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java ---
    @@ -0,0 +1,77 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.api;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.kafka.common.serialization.Serializer;
    +import org.eclipse.rdf4j.model.Statement;
    +import org.eclipse.rdf4j.rio.RDFWriter;
    +import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +
    +/**
    + * A Kafka {@link Serializer} that is able to serialize a set of {@link Statement}s
    + * using the RDF4J Rio Binary format.
    --- End diff --
    
    i feel like it might be worthwhile to mention that you use the RDFParser and RDFWriter here and the Deserializer respectively.  


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187734887
  
    --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java ---
    @@ -0,0 +1,144 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.api.sink;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Collection;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.kafka.connect.errors.ConnectException;
    +import org.apache.kafka.connect.sink.SinkRecord;
    +import org.apache.kafka.connect.sink.SinkTask;
    +import org.eclipse.rdf4j.model.Statement;
    +import org.eclipse.rdf4j.repository.sail.SailRepository;
    +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
    +import org.eclipse.rdf4j.sail.Sail;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.jcabi.manifests.Manifests;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +import edu.umd.cs.findbugs.annotations.Nullable;
    +
    +/**
    + * Handles the common components required to write {@link Statement}s to Rya.
    + * <p/>
    + * Implementations of this class only need to specify functionality that is specific to the
    + * Rya implementation.
    + */
    +@DefaultAnnotation(NonNull.class)
    --- End diff --
    
    It indicates by default the parameters are not null.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187729286
  
    --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java ---
    @@ -0,0 +1,111 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.accumulo;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Map;
    +
    +import org.apache.accumulo.core.client.AccumuloException;
    +import org.apache.accumulo.core.client.AccumuloSecurityException;
    +import org.apache.accumulo.core.client.Connector;
    +import org.apache.accumulo.core.client.Instance;
    +import org.apache.accumulo.core.client.ZooKeeperInstance;
    +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
    +import org.apache.kafka.connect.errors.ConnectException;
    +import org.apache.rya.accumulo.AccumuloRdfConfiguration;
    +import org.apache.rya.api.client.RyaClient;
    +import org.apache.rya.api.client.RyaClientException;
    +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
    +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
    +import org.apache.rya.api.persist.RyaDAOException;
    +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
    +import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
    +import org.apache.rya.sail.config.RyaSailFactory;
    +import org.eclipse.rdf4j.sail.Sail;
    +import org.eclipse.rdf4j.sail.SailException;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +
    +/**
    + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class AccumuloRyaSinkTask extends RyaSinkTask {
    +
    +    @Override
    +    protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException {
    +        requireNonNull(taskConfig);
    +
    +        // Parse the configuration object.
    +        final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
    +
    +        // Connect to the instance of Accumulo.
    +        final Connector connector;
    +        try {
    +            final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
    +            connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() ));
    +        } catch (AccumuloException | AccumuloSecurityException e) {
    +            throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e);
    +        }
    +
    +        // Use a RyaClient to see if the configured instance exists.
    +        try {
    +            final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
    +                    config.getUsername(),
    +                    config.getPassword().toCharArray(),
    +                    config.getClusterName(),
    +                    config.getZookeepers());
    +            final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector);
    +
    +            if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
    +                throw new ConnectException("The Rya Instance named " +
    +                        config.getRyaInstanceName() + " has not been installed.");
    --- End diff --
    
    I'm assuming rya.api. Done.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187735910
  
    --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java ---
    @@ -0,0 +1,144 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.api.sink;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Collection;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.kafka.connect.errors.ConnectException;
    +import org.apache.kafka.connect.sink.SinkRecord;
    +import org.apache.kafka.connect.sink.SinkTask;
    +import org.eclipse.rdf4j.model.Statement;
    +import org.eclipse.rdf4j.repository.sail.SailRepository;
    +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
    +import org.eclipse.rdf4j.sail.Sail;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.jcabi.manifests.Manifests;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +import edu.umd.cs.findbugs.annotations.Nullable;
    +
    +/**
    + * Handles the common components required to write {@link Statement}s to Rya.
    + * <p/>
    + * Implementations of this class only need to specify functionality that is specific to the
    + * Rya implementation.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public abstract class RyaSinkTask extends SinkTask {
    +    private static final Logger log = LoggerFactory.getLogger(RyaSinkTask.class);
    +
    +    @Nullable
    +    private SailRepository sailRepo = null;
    +
    +    @Nullable
    +    private SailRepositoryConnection conn = null;
    +
    +    /**
    +     * Throws an exception if the configured Rya Instance is not already installed
    +     * within the configured database.
    +     *
    +     * @param taskConfig - The configuration values that were provided to the task. (not null)
    +     * @throws ConnectException The configured Rya Instance is not installed to the configured database
    +     *   or we were unable to figure out if it is installed.
    +     */
    +    protected abstract void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException;
    +
    +    /**
    +     * Creates an initialized {@link Sail} object that may be used to write {@link Statement}s to the configured
    +     * Rya Instance.
    +     *
    +     * @param taskConfig - Configures how the Sail object will be created. (not null)
    +     * @return The created Sail object.
    +     * @throws ConnectException The Sail object could not be made.
    +     */
    +    protected abstract Sail makeSail(final Map<String, String> taskConfig) throws ConnectException;
    +
    +    @Override
    +    public String version() {
    +        return Manifests.exists("Build-Version") ? Manifests.read("Build-Version"): "UNKNOWN";
    +    }
    +
    +    @Override
    +    public void start(final Map<String, String> props) throws ConnectException {
    +        requireNonNull(props);
    +
    +        // Ensure the configured Rya Instance is installed within the configured database.
    +        checkRyaInstanceExists(props);
    +
    +        // Create the Sail object that is connected to the Rya Instance.
    +        final Sail sail = makeSail(props);
    +        sailRepo = new SailRepository( sail );
    +        conn = sailRepo.getConnection();
    +    }
    +
    +    @Override
    +    public void put(final Collection<SinkRecord> records) {
    +        requireNonNull(records);
    +
    +        // Return immediately if there are no records to handle.
    +        if(records.isEmpty()) {
    +            return;
    +        }
    +
    +        // If a transaction has not been started yet, then start one.
    +        if(!conn.isActive()) {
    +            conn.begin();
    +        }
    +
    +        // Iterate through the records and write them to the Sail object.
    +        for(final SinkRecord record : records) {
    +            // If everything has been configured correctly, then the record's value will be a Set<Statement>.
    +            conn.add((Set<? extends Statement>) record.value());
    +        }
    +    }
    +
    +    @Override
    +    public void flush(final Map<TopicPartition, OffsetAndMetadata> currentOffets) {
    +        // Flush the current transaction.
    +        conn.commit();
    +    }
    +
    +    @Override
    +    public void stop() {
    +        try {
    --- End diff --
    
    that sail object is shut down internally within the SailRepository class when the SailRepository's shutdown method is invoked. Check like 169 of SailRepository.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187640119
  
    --- Diff: dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java ---
    @@ -274,16 +274,17 @@ public boolean getUseAggregationPipeline() {
          * on their child subtrees.
          * @param value whether to use aggregation pipeline optimization.
          */
    -    public void setUseAggregationPipeline(boolean value) {
    +    public void setUseAggregationPipeline(final boolean value) {
             setBoolean(USE_AGGREGATION_PIPELINE, value);
         }
     
         @Override
         public List<Class<QueryOptimizer>> getOptimizers() {
    -        List<Class<QueryOptimizer>> optimizers = super.getOptimizers();
    +        final List<Class<QueryOptimizer>> optimizers = super.getOptimizers();
             if (getUseAggregationPipeline()) {
    -            Class<?> cl = AggregationPipelineQueryOptimizer.class;
    +            final Class<?> cl = AggregationPipelineQueryOptimizer.class;
                 @SuppressWarnings("unchecked")
    +            final
    --- End diff --
    
    Put final on the same line as its variable


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187629391
  
    --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java ---
    @@ -0,0 +1,111 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.accumulo;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Map;
    +
    +import org.apache.accumulo.core.client.AccumuloException;
    +import org.apache.accumulo.core.client.AccumuloSecurityException;
    +import org.apache.accumulo.core.client.Connector;
    +import org.apache.accumulo.core.client.Instance;
    +import org.apache.accumulo.core.client.ZooKeeperInstance;
    +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
    +import org.apache.kafka.connect.errors.ConnectException;
    +import org.apache.rya.accumulo.AccumuloRdfConfiguration;
    +import org.apache.rya.api.client.RyaClient;
    +import org.apache.rya.api.client.RyaClientException;
    +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
    +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
    +import org.apache.rya.api.persist.RyaDAOException;
    +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
    +import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
    +import org.apache.rya.sail.config.RyaSailFactory;
    +import org.eclipse.rdf4j.sail.Sail;
    +import org.eclipse.rdf4j.sail.SailException;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +
    +/**
    + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class AccumuloRyaSinkTask extends RyaSinkTask {
    +
    +    @Override
    +    protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException {
    +        requireNonNull(taskConfig);
    +
    +        // Parse the configuration object.
    +        final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
    +
    +        // Connect to the instance of Accumulo.
    +        final Connector connector;
    +        try {
    +            final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
    +            connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() ));
    +        } catch (AccumuloException | AccumuloSecurityException e) {
    +            throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e);
    +        }
    +
    +        // Use a RyaClient to see if the configured instance exists.
    +        try {
    +            final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
    +                    config.getUsername(),
    +                    config.getPassword().toCharArray(),
    +                    config.getClusterName(),
    +                    config.getZookeepers());
    +            final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector);
    +
    +            if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
    +                throw new ConnectException("The Rya Instance named " +
    +                        config.getRyaInstanceName() + " has not been installed.");
    +            }
    +
    +        } catch (final RyaClientException e) {
    +            throw new ConnectException("Unable to determine if the Rya Instance named " +
    +                    config.getRyaInstanceName() + " has been installed.", e);
    --- End diff --
    
    same


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187731028
  
    --- Diff: extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/CLIDriver.java ---
    @@ -0,0 +1,121 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.client;
    +
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand.ArgumentsException;
    +import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand.ExecutionException;
    +import org.apache.rya.kafka.connect.client.command.ReadStatementsCommand;
    +import org.apache.rya.kafka.connect.client.command.WriteStatementsCommand;
    +import org.eclipse.rdf4j.model.Statement;
    +
    +import com.google.common.collect.ImmutableMap;
    +import com.google.common.collect.Lists;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +
    +/**
    + * A CLI tool used to read/write {@link Statement}s to/from a Kafka topic using the format
    + * the Rya Kafka Connect Sinks expect.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class CLIDriver {
    +
    +    /**
    +     * Maps from command strings to the object that performs the command.
    +     */
    +    private static final ImmutableMap<String, RyaKafkaClientCommand> COMMANDS;
    +    static {
    +        final Set<Class<? extends RyaKafkaClientCommand>> commandClasses = new HashSet<>();
    +        commandClasses.add(ReadStatementsCommand.class);
    +        commandClasses.add(WriteStatementsCommand.class);
    +        final ImmutableMap.Builder<String, RyaKafkaClientCommand> builder = ImmutableMap.builder();
    +        for(final Class<? extends RyaKafkaClientCommand> commandClass : commandClasses) {
    +            try {
    +                final RyaKafkaClientCommand command = commandClass.newInstance();
    +                builder.put(command.getCommand(), command);
    +            } catch (InstantiationException | IllegalAccessException e) {
    +                System.err.println("Could not run the application because a RyaKafkaClientCommand is missing its empty constructor.");
    +                e.printStackTrace();
    +            }
    +        }
    +        COMMANDS = builder.build();
    +    }
    +
    +    private static final String USAGE = makeUsage(COMMANDS);
    +
    +    public static void main(final String[] args) {
    +        // If no command provided or the command isn't recognized, then print the usage.
    +        if (args.length == 0 || !COMMANDS.containsKey(args[0])) {
    +            System.out.println(USAGE);
    +            System.exit(1);
    +        }
    +
    +        // Fetch the command that will be executed.
    +        final String command = args[0];
    +        final String[] commandArgs = Arrays.copyOfRange(args, 1, args.length);
    +        final RyaKafkaClientCommand clientCommand = COMMANDS.get(command);
    +
    +        // Print usage if the arguments are invalid for the command.
    +        if(!clientCommand.validArguments(commandArgs)) {
    +            System.out.println(clientCommand.getUsage());
    +            System.exit(1);
    +        }
    +
    +        // Execute the command.
    +            try {
    --- End diff --
    
    Done.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187701248
  
    --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java ---
    @@ -0,0 +1,63 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.accumulo;
    +
    +import java.util.Map;
    +
    +import org.apache.kafka.common.config.AbstractConfig;
    +import org.apache.kafka.common.config.ConfigDef;
    +import org.apache.kafka.connect.connector.Task;
    +import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +import edu.umd.cs.findbugs.annotations.Nullable;
    +
    +/**
    + * A {@link RyaSinkConnector} that uses an Accumulo Rya backend when creating tasks.
    + */
    +@DefaultAnnotation(NonNull.class)
    --- End diff --
    
    since there is one field that is marked nullable, is this annotation usefull?


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187742000
  
    --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java ---
    @@ -0,0 +1,77 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.api;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.kafka.common.serialization.Serializer;
    +import org.eclipse.rdf4j.model.Statement;
    +import org.eclipse.rdf4j.rio.RDFWriter;
    +import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +
    +/**
    + * A Kafka {@link Serializer} that is able to serialize a set of {@link Statement}s
    + * using the RDF4J Rio Binary format.
    --- End diff --
    
    I mean that's true, but I don't think it's worth documenting.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187702276
  
    --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java ---
    @@ -0,0 +1,111 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.accumulo;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Map;
    +
    +import org.apache.accumulo.core.client.AccumuloException;
    +import org.apache.accumulo.core.client.AccumuloSecurityException;
    +import org.apache.accumulo.core.client.Connector;
    +import org.apache.accumulo.core.client.Instance;
    +import org.apache.accumulo.core.client.ZooKeeperInstance;
    +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
    +import org.apache.kafka.connect.errors.ConnectException;
    +import org.apache.rya.accumulo.AccumuloRdfConfiguration;
    +import org.apache.rya.api.client.RyaClient;
    +import org.apache.rya.api.client.RyaClientException;
    +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
    +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
    +import org.apache.rya.api.persist.RyaDAOException;
    +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
    +import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
    +import org.apache.rya.sail.config.RyaSailFactory;
    +import org.eclipse.rdf4j.sail.Sail;
    +import org.eclipse.rdf4j.sail.SailException;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +
    +/**
    + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class AccumuloRyaSinkTask extends RyaSinkTask {
    +
    +    @Override
    +    protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException {
    +        requireNonNull(taskConfig);
    +
    +        // Parse the configuration object.
    +        final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
    +
    +        // Connect to the instance of Accumulo.
    +        final Connector connector;
    +        try {
    +            final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
    +            connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() ));
    +        } catch (AccumuloException | AccumuloSecurityException e) {
    +            throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e);
    +        }
    +
    +        // Use a RyaClient to see if the configured instance exists.
    +        try {
    +            final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
    +                    config.getUsername(),
    +                    config.getPassword().toCharArray(),
    +                    config.getClusterName(),
    +                    config.getZookeepers());
    +            final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector);
    +
    +            if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
    +                throw new ConnectException("The Rya Instance named " +
    +                        config.getRyaInstanceName() + " has not been installed.");
    --- End diff --
    
    LogUtils from which package?  


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187637697
  
    --- Diff: extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/WriteStatementsCommand.java ---
    @@ -0,0 +1,187 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.client.command;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.io.IOException;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.util.HashSet;
    +import java.util.Properties;
    +import java.util.Set;
    +
    +import org.apache.kafka.clients.producer.KafkaProducer;
    +import org.apache.kafka.clients.producer.Producer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.common.serialization.StringSerializer;
    +import org.apache.rya.kafka.connect.api.StatementsSerializer;
    +import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand;
    +import org.apache.rya.rdftriplestore.utils.RdfFormatUtils;
    +import org.eclipse.rdf4j.model.Statement;
    +import org.eclipse.rdf4j.rio.RDFFormat;
    +import org.eclipse.rdf4j.rio.RDFHandlerException;
    +import org.eclipse.rdf4j.rio.RDFParseException;
    +import org.eclipse.rdf4j.rio.RDFParser;
    +import org.eclipse.rdf4j.rio.Rio;
    +import org.eclipse.rdf4j.rio.UnsupportedRDFormatException;
    +import org.eclipse.rdf4j.rio.helpers.AbstractRDFHandler;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.beust.jcommander.JCommander;
    +import com.beust.jcommander.Parameter;
    +import com.beust.jcommander.ParameterException;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +
    +/**
    + * Writes {@link Statement}s to a Kafka topic using the Rya Kafka Connect Sink format.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class WriteStatementsCommand implements RyaKafkaClientCommand {
    +    private static final Logger log = LoggerFactory.getLogger(WriteStatementsCommand.class);
    +
    +    /**
    +     * Command line parameters that are used by this command to configure itself.
    +     */
    +    public static class WriteParameters extends KafkaParameters {
    +        @Parameter(names = {"--statementsFile", "-f"}, required = true, description = "The file of RDF statements to load into Rya Streams.")
    +        public String statementsFile;
    +    }
    +
    +    @Override
    +    public String getCommand() {
    +        return "write";
    +    }
    +
    +    @Override
    +    public String getDescription() {
    +        return "Writes Statements to the specified Kafka topic.";
    +    }
    +
    +    @Override
    +    public boolean validArguments(final String[] args) {
    +        boolean valid = true;
    +        try {
    +            new JCommander(new WriteParameters(), args);
    +        } catch(final ParameterException e) {
    +            valid = false;
    +        }
    +        return valid;
    +    }
    +
    +    /**
    +     * @return Describes what arguments may be provided to the command.
    +     */
    +    @Override
    +    public String getUsage() {
    +        final JCommander parser = new JCommander(new WriteParameters());
    +
    +        final StringBuilder usage = new StringBuilder();
    +        parser.usage(usage);
    +        return usage.toString();
    +    }
    +
    +    @Override
    +    public void execute(final String[] args) throws ArgumentsException, ExecutionException {
    +        requireNonNull(args);
    +
    +        // Parse the command line arguments.
    +        final WriteParameters params = new WriteParameters();
    +        try {
    +            new JCommander(params, args);
    +        } catch(final ParameterException e) {
    +            throw new ArgumentsException("Could not stream the query's results because of invalid command line parameters.", e);
    +        }
    +
    +        // Verify the configured statements file path.
    +        final Path statementsPath = Paths.get(params.statementsFile);
    +        if(!statementsPath.toFile().exists()) {
    +            throw new ArgumentsException("Could not load statements at path '" + statementsPath + "' because that " +
    +                    "file does not exist. Make sure you've entered the correct path.");
    +        }
    +
    +        // Create an RDF Parser whose format is derived from the statementPath's file extension.
    +        final String filename = statementsPath.getFileName().toString();
    +        final RDFFormat format = RdfFormatUtils.forFileName(filename);
    +        if (format == null) {
    +            throw new UnsupportedRDFormatException("Unknown RDF format for the file: " + filename);
    +        }
    +        final RDFParser parser = Rio.createParser(format);
    +
    +        // Set up the producer.
    +        try(Producer<String, Set<Statement>> producer = makeProducer(params)) {
    +            // Set a handler that writes the statements to the specified kafka topic. It writes batches of 5 Statements.
    +            parser.setRDFHandler(new AbstractRDFHandler() {
    +
    +                private Set<Statement> batch = new HashSet<>(5);
    +
    +                @Override
    +                public void startRDF() throws RDFHandlerException {
    +                    log.trace("Starting loading statements.");
    +                }
    +
    +                @Override
    +                public void handleStatement(final Statement stmnt) throws RDFHandlerException {
    +                    log.trace("Adding statement.");
    +                    batch.add(stmnt);
    +
    +                    if(batch.size() == 5) {
    +                        flushBatch();
    +                    }
    +                }
    +
    +                @Override
    +                public void endRDF() throws RDFHandlerException {
    +                    if(!batch.isEmpty()) {
    +                        flushBatch();
    +                    }
    +                    log.trace("Done.");
    +                }
    +
    +                private void flushBatch() {
    +                    log.trace("Flushing batch of size " + batch.size());
    +                    producer.send(new ProducerRecord<>(params.topic, null, batch));
    +                    batch = new HashSet<>(5);
    +                    producer.flush();
    +                }
    +            });
    +
    +            // Do the parse and load.
    +            try {
    +                parser.parse(Files.newInputStream(statementsPath), "");
    +            } catch (RDFParseException | RDFHandlerException | IOException e) {
    +                throw new ExecutionException("Could not load the RDF file's Statements into the Kafka topic.", e);
    +            }
    +        }
    +    }
    +
    +    private Producer<String, Set<Statement>> makeProducer(final KafkaParameters params) {
    --- End diff --
    
    make static


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187729543
  
    --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java ---
    @@ -0,0 +1,111 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.accumulo;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Map;
    +
    +import org.apache.accumulo.core.client.AccumuloException;
    +import org.apache.accumulo.core.client.AccumuloSecurityException;
    +import org.apache.accumulo.core.client.Connector;
    +import org.apache.accumulo.core.client.Instance;
    +import org.apache.accumulo.core.client.ZooKeeperInstance;
    +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
    +import org.apache.kafka.connect.errors.ConnectException;
    +import org.apache.rya.accumulo.AccumuloRdfConfiguration;
    +import org.apache.rya.api.client.RyaClient;
    +import org.apache.rya.api.client.RyaClientException;
    +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
    +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
    +import org.apache.rya.api.persist.RyaDAOException;
    +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
    +import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
    +import org.apache.rya.sail.config.RyaSailFactory;
    +import org.eclipse.rdf4j.sail.Sail;
    +import org.eclipse.rdf4j.sail.SailException;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +
    +/**
    + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class AccumuloRyaSinkTask extends RyaSinkTask {
    +
    +    @Override
    +    protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException {
    +        requireNonNull(taskConfig);
    +
    +        // Parse the configuration object.
    +        final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
    +
    +        // Connect to the instance of Accumulo.
    +        final Connector connector;
    +        try {
    +            final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
    +            connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() ));
    +        } catch (AccumuloException | AccumuloSecurityException e) {
    +            throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e);
    +        }
    +
    +        // Use a RyaClient to see if the configured instance exists.
    +        try {
    +            final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
    +                    config.getUsername(),
    +                    config.getPassword().toCharArray(),
    +                    config.getClusterName(),
    +                    config.getZookeepers());
    +            final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector);
    +
    +            if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
    +                throw new ConnectException("The Rya Instance named " +
    +                        config.getRyaInstanceName() + " has not been installed.");
    +            }
    +
    +        } catch (final RyaClientException e) {
    +            throw new ConnectException("Unable to determine if the Rya Instance named " +
    +                    config.getRyaInstanceName() + " has been installed.", e);
    --- End diff --
    
    Done.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187731146
  
    --- Diff: extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/WriteStatementsCommand.java ---
    @@ -0,0 +1,187 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.client.command;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.io.IOException;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.util.HashSet;
    +import java.util.Properties;
    +import java.util.Set;
    +
    +import org.apache.kafka.clients.producer.KafkaProducer;
    +import org.apache.kafka.clients.producer.Producer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.common.serialization.StringSerializer;
    +import org.apache.rya.kafka.connect.api.StatementsSerializer;
    +import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand;
    +import org.apache.rya.rdftriplestore.utils.RdfFormatUtils;
    +import org.eclipse.rdf4j.model.Statement;
    +import org.eclipse.rdf4j.rio.RDFFormat;
    +import org.eclipse.rdf4j.rio.RDFHandlerException;
    +import org.eclipse.rdf4j.rio.RDFParseException;
    +import org.eclipse.rdf4j.rio.RDFParser;
    +import org.eclipse.rdf4j.rio.Rio;
    +import org.eclipse.rdf4j.rio.UnsupportedRDFormatException;
    +import org.eclipse.rdf4j.rio.helpers.AbstractRDFHandler;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.beust.jcommander.JCommander;
    +import com.beust.jcommander.Parameter;
    +import com.beust.jcommander.ParameterException;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +
    +/**
    + * Writes {@link Statement}s to a Kafka topic using the Rya Kafka Connect Sink format.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class WriteStatementsCommand implements RyaKafkaClientCommand {
    +    private static final Logger log = LoggerFactory.getLogger(WriteStatementsCommand.class);
    +
    +    /**
    +     * Command line parameters that are used by this command to configure itself.
    +     */
    +    public static class WriteParameters extends KafkaParameters {
    +        @Parameter(names = {"--statementsFile", "-f"}, required = true, description = "The file of RDF statements to load into Rya Streams.")
    +        public String statementsFile;
    +    }
    +
    +    @Override
    +    public String getCommand() {
    +        return "write";
    +    }
    +
    +    @Override
    +    public String getDescription() {
    +        return "Writes Statements to the specified Kafka topic.";
    +    }
    +
    +    @Override
    +    public boolean validArguments(final String[] args) {
    +        boolean valid = true;
    +        try {
    +            new JCommander(new WriteParameters(), args);
    +        } catch(final ParameterException e) {
    +            valid = false;
    +        }
    +        return valid;
    +    }
    +
    +    /**
    +     * @return Describes what arguments may be provided to the command.
    +     */
    +    @Override
    +    public String getUsage() {
    +        final JCommander parser = new JCommander(new WriteParameters());
    +
    +        final StringBuilder usage = new StringBuilder();
    +        parser.usage(usage);
    +        return usage.toString();
    +    }
    +
    +    @Override
    +    public void execute(final String[] args) throws ArgumentsException, ExecutionException {
    +        requireNonNull(args);
    +
    +        // Parse the command line arguments.
    +        final WriteParameters params = new WriteParameters();
    +        try {
    +            new JCommander(params, args);
    +        } catch(final ParameterException e) {
    +            throw new ArgumentsException("Could not stream the query's results because of invalid command line parameters.", e);
    +        }
    +
    +        // Verify the configured statements file path.
    +        final Path statementsPath = Paths.get(params.statementsFile);
    +        if(!statementsPath.toFile().exists()) {
    +            throw new ArgumentsException("Could not load statements at path '" + statementsPath + "' because that " +
    +                    "file does not exist. Make sure you've entered the correct path.");
    +        }
    +
    +        // Create an RDF Parser whose format is derived from the statementPath's file extension.
    +        final String filename = statementsPath.getFileName().toString();
    +        final RDFFormat format = RdfFormatUtils.forFileName(filename);
    +        if (format == null) {
    +            throw new UnsupportedRDFormatException("Unknown RDF format for the file: " + filename);
    +        }
    +        final RDFParser parser = Rio.createParser(format);
    +
    +        // Set up the producer.
    +        try(Producer<String, Set<Statement>> producer = makeProducer(params)) {
    +            // Set a handler that writes the statements to the specified kafka topic. It writes batches of 5 Statements.
    +            parser.setRDFHandler(new AbstractRDFHandler() {
    +
    +                private Set<Statement> batch = new HashSet<>(5);
    +
    +                @Override
    +                public void startRDF() throws RDFHandlerException {
    +                    log.trace("Starting loading statements.");
    +                }
    +
    +                @Override
    +                public void handleStatement(final Statement stmnt) throws RDFHandlerException {
    +                    log.trace("Adding statement.");
    +                    batch.add(stmnt);
    +
    +                    if(batch.size() == 5) {
    +                        flushBatch();
    +                    }
    +                }
    +
    +                @Override
    +                public void endRDF() throws RDFHandlerException {
    +                    if(!batch.isEmpty()) {
    +                        flushBatch();
    +                    }
    +                    log.trace("Done.");
    +                }
    +
    +                private void flushBatch() {
    +                    log.trace("Flushing batch of size " + batch.size());
    +                    producer.send(new ProducerRecord<>(params.topic, null, batch));
    +                    batch = new HashSet<>(5);
    +                    producer.flush();
    +                }
    +            });
    +
    +            // Do the parse and load.
    +            try {
    +                parser.parse(Files.newInputStream(statementsPath), "");
    +            } catch (RDFParseException | RDFHandlerException | IOException e) {
    +                throw new ExecutionException("Could not load the RDF file's Statements into the Kafka topic.", e);
    +            }
    +        }
    +    }
    +
    +    private Producer<String, Set<Statement>> makeProducer(final KafkaParameters params) {
    --- End diff --
    
    Done.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187657223
  
    --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java ---
    @@ -0,0 +1,111 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.accumulo;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Map;
    +
    +import org.apache.accumulo.core.client.AccumuloException;
    +import org.apache.accumulo.core.client.AccumuloSecurityException;
    +import org.apache.accumulo.core.client.Connector;
    +import org.apache.accumulo.core.client.Instance;
    +import org.apache.accumulo.core.client.ZooKeeperInstance;
    +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
    +import org.apache.kafka.connect.errors.ConnectException;
    +import org.apache.rya.accumulo.AccumuloRdfConfiguration;
    +import org.apache.rya.api.client.RyaClient;
    +import org.apache.rya.api.client.RyaClientException;
    +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
    +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
    +import org.apache.rya.api.persist.RyaDAOException;
    +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
    +import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
    +import org.apache.rya.sail.config.RyaSailFactory;
    +import org.eclipse.rdf4j.sail.Sail;
    +import org.eclipse.rdf4j.sail.SailException;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +
    +/**
    + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class AccumuloRyaSinkTask extends RyaSinkTask {
    +
    +    @Override
    +    protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException {
    +        requireNonNull(taskConfig);
    +
    +        // Parse the configuration object.
    +        final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
    +
    +        // Connect to the instance of Accumulo.
    +        final Connector connector;
    +        try {
    +            final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
    +            connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() ));
    +        } catch (AccumuloException | AccumuloSecurityException e) {
    +            throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e);
    +        }
    +
    +        // Use a RyaClient to see if the configured instance exists.
    +        try {
    +            final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
    +                    config.getUsername(),
    +                    config.getPassword().toCharArray(),
    +                    config.getClusterName(),
    +                    config.getZookeepers());
    +            final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector);
    +
    +            if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
    +                throw new ConnectException("The Rya Instance named " +
    +                        config.getRyaInstanceName() + " has not been installed.");
    +            }
    +
    +        } catch (final RyaClientException e) {
    +            throw new ConnectException("Unable to determine if the Rya Instance named " +
    +                    config.getRyaInstanceName() + " has been installed.", e);
    +        }
    +    }
    +
    +    @Override
    +    protected Sail makeSail(final Map<String, String> taskConfig) throws ConnectException {
    +        requireNonNull(taskConfig);
    +
    +        // Parse the configuration object.
    +        final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
    +
    +        // Move the configuration into a Rya Configuration object.
    +        final AccumuloRdfConfiguration ryaConfig = new AccumuloRdfConfiguration();
    +        ryaConfig.setTablePrefix( config.getRyaInstanceName() );
    +        ryaConfig.setAccumuloZookeepers( config.getZookeepers() );
    +        ryaConfig.setAccumuloInstance( config.getClusterName() );
    +        ryaConfig.setAccumuloUser( config.getUsername() );
    +        ryaConfig.setAccumuloPassword( config.getPassword() );
    +
    +        // Create the Sail object.
    +        try {
    +            return RyaSailFactory.getInstance(ryaConfig);
    --- End diff --
    
    I'm under the impression that code will not work if the Geo jars aren't included on the classpath.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187729976
  
    --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java ---
    @@ -0,0 +1,77 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.api;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.kafka.common.serialization.Serializer;
    +import org.eclipse.rdf4j.model.Statement;
    +import org.eclipse.rdf4j.rio.RDFWriter;
    +import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +
    +/**
    + * A Kafka {@link Serializer} that is able to serialize a set of {@link Statement}s
    + * using the RDF4J Rio Binary format.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class StatementsSerializer implements Serializer<Set<Statement>> {
    +    private static final Logger log = LoggerFactory.getLogger(StatementsSerializer.class);
    +
    +    private static final BinaryRDFWriterFactory WRITER_FACTORY = new BinaryRDFWriterFactory();
    +
    +    @Override
    +    public void configure(final Map<String, ?> configs, final boolean isKey) {
    +        // Nothing to do.
    +    }
    +
    +    @Override
    +    public byte[] serialize(final String topic, final Set<Statement> data) {
    +        if(data == null) {
    +            // Returning null because that is the contract of this method.
    +            return null;
    +        }
    +
    +        // Write the statements using a Binary RDF Writer.
    +        final ByteArrayOutputStream boas = new ByteArrayOutputStream();
    --- End diff --
    
    Done.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187627726
  
    --- Diff: extras/kafka.connect/accumulo-it/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTaskIT.java ---
    @@ -0,0 +1,100 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.accumulo;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import org.apache.kafka.connect.errors.ConnectException;
    +import org.apache.rya.api.client.Install.InstallConfiguration;
    +import org.apache.rya.api.client.RyaClient;
    +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
    +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
    +import org.apache.rya.test.accumulo.AccumuloITBase;
    +import org.junit.Test;
    +
    +/**
    + * Integration tests the methods of {@link AccumuloRyaSinkTask}.
    --- End diff --
    
    typo.
    ...tests for the methods...


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187729813
  
    --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsDeserializer.java ---
    @@ -0,0 +1,92 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.api;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.IOException;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.kafka.common.serialization.Deserializer;
    +import org.eclipse.rdf4j.model.Statement;
    +import org.eclipse.rdf4j.rio.RDFHandler;
    +import org.eclipse.rdf4j.rio.RDFHandlerException;
    +import org.eclipse.rdf4j.rio.RDFParseException;
    +import org.eclipse.rdf4j.rio.RDFParser;
    +import org.eclipse.rdf4j.rio.binary.BinaryRDFParserFactory;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +
    +/**
    + * A Kafka {@link Deserializer} that is able to deserialize an RDF4J Rio Binary format serialized
    + * set of {@link Statement}s.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class StatementsDeserializer implements Deserializer<Set<Statement>> {
    +    private static final Logger log = LoggerFactory.getLogger(StatementsDeserializer.class);
    +
    +    private static final BinaryRDFParserFactory PARSER_FACTORY = new BinaryRDFParserFactory();
    +
    +    @Override
    +    public void configure(final Map<String, ?> configs, final boolean isKey) {
    +        // Nothing to do.
    +    }
    +
    +    @Override
    +    public Set<Statement> deserialize(final String topic, final byte[] data) {
    +        if(data == null || data.length == 0) {
    +            // Return null because that is the contract of this method.
    +            return null;
    +        }
    +
    +        try {
    +            final RDFParser parser = PARSER_FACTORY.getParser();
    +            final Set<Statement> statements = new HashSet<>();
    +
    +            parser.setRDFHandler(new RDFHandler() {
    +                @Override
    +                public void handleStatement(final Statement statement) throws RDFHandlerException {
    +                    log.debug("Statement: " + statement);
    +                    statements.add( statement );
    +                }
    +
    +                @Override public void startRDF() throws RDFHandlerException { }
    +                @Override public void handleNamespace(final String arg0, final String arg1) throws RDFHandlerException { }
    +                @Override public void handleComment(final String arg0) throws RDFHandlerException { }
    +                @Override public void endRDF() throws RDFHandlerException { }
    +            });
    +
    +            parser.parse(new ByteArrayInputStream(data), null);
    +            return statements;
    +
    +        } catch(final RDFParseException | RDFHandlerException | IOException e) {
    +            log.error("Could not deserialize a Set of VisibilityStatement objects using the RDF4J Rio Binray format.", e);
    --- End diff --
    
    Done.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187701955
  
    --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java ---
    @@ -0,0 +1,111 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.accumulo;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Map;
    +
    +import org.apache.accumulo.core.client.AccumuloException;
    +import org.apache.accumulo.core.client.AccumuloSecurityException;
    +import org.apache.accumulo.core.client.Connector;
    +import org.apache.accumulo.core.client.Instance;
    +import org.apache.accumulo.core.client.ZooKeeperInstance;
    +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
    +import org.apache.kafka.connect.errors.ConnectException;
    +import org.apache.rya.accumulo.AccumuloRdfConfiguration;
    +import org.apache.rya.api.client.RyaClient;
    +import org.apache.rya.api.client.RyaClientException;
    +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
    +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
    +import org.apache.rya.api.persist.RyaDAOException;
    +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
    +import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
    +import org.apache.rya.sail.config.RyaSailFactory;
    +import org.eclipse.rdf4j.sail.Sail;
    +import org.eclipse.rdf4j.sail.SailException;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +
    +/**
    + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class AccumuloRyaSinkTask extends RyaSinkTask {
    +
    +    @Override
    +    protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException {
    +        requireNonNull(taskConfig);
    +
    +        // Parse the configuration object.
    +        final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
    +
    +        // Connect to the instance of Accumulo.
    +        final Connector connector;
    +        try {
    +            final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
    +            connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() ));
    +        } catch (AccumuloException | AccumuloSecurityException e) {
    +            throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e);
    +        }
    +
    +        // Use a RyaClient to see if the configured instance exists.
    +        try {
    +            final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
    --- End diff --
    
    a quick future work nice to have idea: we have a ConnectionDetails->Configuration, but not the other way around.  would make stuff like this pretty easy.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187718927
  
    --- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConnector.java ---
    @@ -0,0 +1,63 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.mongo;
    +
    +import java.util.Map;
    +
    +import org.apache.kafka.common.config.AbstractConfig;
    +import org.apache.kafka.common.config.ConfigDef;
    +import org.apache.kafka.connect.connector.Task;
    +import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +import edu.umd.cs.findbugs.annotations.Nullable;
    +
    +/**
    + * A {@link RyaSinkConnector} that uses a Mongo DB Rya backend when creating tasks.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class MongoRyaSinkConnector extends RyaSinkConnector {
    +
    +    @Nullable
    +    private MongoRyaSinkConfig config = null;
    +
    +    @Override
    +    public void start(final Map<String, String> props) {
    +        this.config = new MongoRyaSinkConfig( props );
    +    }
    +
    +    @Override
    +    protected AbstractConfig getConfig() {
    +        if(config == null) {
    +            throw new IllegalStateException("The configuration has not been set yet. Invoke start(props) first.");
    --- End diff --
    
    same start(Map)


---

[GitHub] incubator-rya issue #296: RYA-487 Kafka Connect Sinks

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/incubator-rya/pull/296
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/760/



---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187717488
  
    --- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfig.java ---
    @@ -0,0 +1,94 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.mongo;
    +
    +import java.util.Map;
    +
    +import org.apache.kafka.common.config.ConfigDef;
    +import org.apache.kafka.common.config.ConfigDef.Importance;
    +import org.apache.kafka.common.config.ConfigDef.Type;
    +import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +
    +/**
    + * A Kafka Connect configuration that is used to configure {@link MongoRyaSinkConnector}s and {@link MongoRyaSinkTask}s.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class MongoRyaSinkConfig extends RyaSinkConfig {
    +
    +    public static final String HOSTNAME = "mongo.hostname";
    +    private static final String HOSTNAME_DOC = "The Mongo DB hostname the Sail connections wlll use.";
    --- End diff --
    
    typo: wlll


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187732940
  
    --- Diff: extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java ---
    @@ -438,6 +438,9 @@ public static boolean getUsePcjUpdaterIndex(final Configuration conf) {
             return Optional.fromNullable(conf.get(FLUO_APP_NAME));
         }
     
    +    public static void setUseMongo(final Configuration conf, final boolean useMongo) {
    --- End diff --
    
    I don't wan to mess with how our configuration objects are initialized for the scope of this ticket.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r188060281
  
    --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsDeserializer.java ---
    @@ -0,0 +1,92 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.api;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.IOException;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.kafka.common.serialization.Deserializer;
    +import org.eclipse.rdf4j.model.Statement;
    +import org.eclipse.rdf4j.rio.RDFHandler;
    +import org.eclipse.rdf4j.rio.RDFHandlerException;
    +import org.eclipse.rdf4j.rio.RDFParseException;
    +import org.eclipse.rdf4j.rio.RDFParser;
    +import org.eclipse.rdf4j.rio.binary.BinaryRDFParserFactory;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +
    +/**
    + * A Kafka {@link Deserializer} that is able to deserialize an RDF4J Rio Binary format serialized
    + * set of {@link Statement}s.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class StatementsDeserializer implements Deserializer<Set<Statement>> {
    +    private static final Logger log = LoggerFactory.getLogger(StatementsDeserializer.class);
    +
    +    private static final BinaryRDFParserFactory PARSER_FACTORY = new BinaryRDFParserFactory();
    +
    +    @Override
    +    public void configure(final Map<String, ?> configs, final boolean isKey) {
    +        // Nothing to do.
    +    }
    +
    +    @Override
    +    public Set<Statement> deserialize(final String topic, final byte[] data) {
    +        if(data == null || data.length == 0) {
    +            // Return null because that is the contract of this method.
    +            return null;
    +        }
    +
    +        try {
    +            final RDFParser parser = PARSER_FACTORY.getParser();
    +            final Set<Statement> statements = new HashSet<>();
    +
    +            parser.setRDFHandler(new RDFHandler() {
    --- End diff --
    
    Done.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187733815
  
    --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java ---
    @@ -0,0 +1,111 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.accumulo;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Map;
    +
    +import org.apache.accumulo.core.client.AccumuloException;
    +import org.apache.accumulo.core.client.AccumuloSecurityException;
    +import org.apache.accumulo.core.client.Connector;
    +import org.apache.accumulo.core.client.Instance;
    +import org.apache.accumulo.core.client.ZooKeeperInstance;
    +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
    +import org.apache.kafka.connect.errors.ConnectException;
    +import org.apache.rya.accumulo.AccumuloRdfConfiguration;
    +import org.apache.rya.api.client.RyaClient;
    +import org.apache.rya.api.client.RyaClientException;
    +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
    +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
    +import org.apache.rya.api.persist.RyaDAOException;
    +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
    +import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
    +import org.apache.rya.sail.config.RyaSailFactory;
    +import org.eclipse.rdf4j.sail.Sail;
    +import org.eclipse.rdf4j.sail.SailException;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +
    +/**
    + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class AccumuloRyaSinkTask extends RyaSinkTask {
    +
    +    @Override
    +    protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException {
    +        requireNonNull(taskConfig);
    +
    +        // Parse the configuration object.
    +        final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
    +
    +        // Connect to the instance of Accumulo.
    +        final Connector connector;
    +        try {
    +            final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
    +            connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() ));
    +        } catch (AccumuloException | AccumuloSecurityException e) {
    +            throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e);
    +        }
    +
    +        // Use a RyaClient to see if the configured instance exists.
    +        try {
    +            final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
    --- End diff --
    
    That's true. Feel free to write an improvement ticket for that.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187731624
  
    --- Diff: dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java ---
    @@ -274,16 +274,17 @@ public boolean getUseAggregationPipeline() {
          * on their child subtrees.
          * @param value whether to use aggregation pipeline optimization.
          */
    -    public void setUseAggregationPipeline(boolean value) {
    +    public void setUseAggregationPipeline(final boolean value) {
             setBoolean(USE_AGGREGATION_PIPELINE, value);
         }
     
         @Override
         public List<Class<QueryOptimizer>> getOptimizers() {
    -        List<Class<QueryOptimizer>> optimizers = super.getOptimizers();
    +        final List<Class<QueryOptimizer>> optimizers = super.getOptimizers();
             if (getUseAggregationPipeline()) {
    -            Class<?> cl = AggregationPipelineQueryOptimizer.class;
    +            final Class<?> cl = AggregationPipelineQueryOptimizer.class;
                 @SuppressWarnings("unchecked")
    +            final
    --- End diff --
    
    Done.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187980159
  
    --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsDeserializer.java ---
    @@ -0,0 +1,92 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.api;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.IOException;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.kafka.common.serialization.Deserializer;
    +import org.eclipse.rdf4j.model.Statement;
    +import org.eclipse.rdf4j.rio.RDFHandler;
    +import org.eclipse.rdf4j.rio.RDFHandlerException;
    +import org.eclipse.rdf4j.rio.RDFParseException;
    +import org.eclipse.rdf4j.rio.RDFParser;
    +import org.eclipse.rdf4j.rio.binary.BinaryRDFParserFactory;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +
    +/**
    + * A Kafka {@link Deserializer} that is able to deserialize an RDF4J Rio Binary format serialized
    + * set of {@link Statement}s.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class StatementsDeserializer implements Deserializer<Set<Statement>> {
    +    private static final Logger log = LoggerFactory.getLogger(StatementsDeserializer.class);
    +
    +    private static final BinaryRDFParserFactory PARSER_FACTORY = new BinaryRDFParserFactory();
    +
    +    @Override
    +    public void configure(final Map<String, ?> configs, final boolean isKey) {
    +        // Nothing to do.
    +    }
    +
    +    @Override
    +    public Set<Statement> deserialize(final String topic, final byte[] data) {
    +        if(data == null || data.length == 0) {
    +            // Return null because that is the contract of this method.
    +            return null;
    +        }
    +
    +        try {
    +            final RDFParser parser = PARSER_FACTORY.getParser();
    +            final Set<Statement> statements = new HashSet<>();
    +
    +            parser.setRDFHandler(new RDFHandler() {
    --- End diff --
    
    Use AbstractRDFHandler since only handleStatement() needs to be overridden and the other overridden methods can be  deleted.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187739974
  
    --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java ---
    @@ -0,0 +1,144 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.api.sink;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Collection;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.kafka.connect.errors.ConnectException;
    +import org.apache.kafka.connect.sink.SinkRecord;
    +import org.apache.kafka.connect.sink.SinkTask;
    +import org.eclipse.rdf4j.model.Statement;
    +import org.eclipse.rdf4j.repository.sail.SailRepository;
    +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
    +import org.eclipse.rdf4j.sail.Sail;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.jcabi.manifests.Manifests;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +import edu.umd.cs.findbugs.annotations.Nullable;
    +
    +/**
    + * Handles the common components required to write {@link Statement}s to Rya.
    + * <p/>
    + * Implementations of this class only need to specify functionality that is specific to the
    + * Rya implementation.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public abstract class RyaSinkTask extends SinkTask {
    +    private static final Logger log = LoggerFactory.getLogger(RyaSinkTask.class);
    +
    +    @Nullable
    +    private SailRepository sailRepo = null;
    +
    +    @Nullable
    +    private SailRepositoryConnection conn = null;
    +
    +    /**
    +     * Throws an exception if the configured Rya Instance is not already installed
    +     * within the configured database.
    +     *
    +     * @param taskConfig - The configuration values that were provided to the task. (not null)
    +     * @throws ConnectException The configured Rya Instance is not installed to the configured database
    +     *   or we were unable to figure out if it is installed.
    +     */
    +    protected abstract void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException;
    +
    +    /**
    +     * Creates an initialized {@link Sail} object that may be used to write {@link Statement}s to the configured
    +     * Rya Instance.
    +     *
    +     * @param taskConfig - Configures how the Sail object will be created. (not null)
    +     * @return The created Sail object.
    +     * @throws ConnectException The Sail object could not be made.
    +     */
    +    protected abstract Sail makeSail(final Map<String, String> taskConfig) throws ConnectException;
    +
    +    @Override
    +    public String version() {
    +        return Manifests.exists("Build-Version") ? Manifests.read("Build-Version"): "UNKNOWN";
    +    }
    +
    +    @Override
    +    public void start(final Map<String, String> props) throws ConnectException {
    +        requireNonNull(props);
    +
    +        // Ensure the configured Rya Instance is installed within the configured database.
    +        checkRyaInstanceExists(props);
    +
    +        // Create the Sail object that is connected to the Rya Instance.
    +        final Sail sail = makeSail(props);
    +        sailRepo = new SailRepository( sail );
    +        conn = sailRepo.getConnection();
    +    }
    +
    +    @Override
    +    public void put(final Collection<SinkRecord> records) {
    +        requireNonNull(records);
    +
    +        // Return immediately if there are no records to handle.
    +        if(records.isEmpty()) {
    +            return;
    +        }
    +
    +        // If a transaction has not been started yet, then start one.
    +        if(!conn.isActive()) {
    +            conn.begin();
    +        }
    +
    +        // Iterate through the records and write them to the Sail object.
    +        for(final SinkRecord record : records) {
    +            // If everything has been configured correctly, then the record's value will be a Set<Statement>.
    +            conn.add((Set<? extends Statement>) record.value());
    +        }
    +    }
    +
    +    @Override
    +    public void flush(final Map<TopicPartition, OffsetAndMetadata> currentOffets) {
    +        // Flush the current transaction.
    +        conn.commit();
    +    }
    +
    +    @Override
    +    public void stop() {
    +        try {
    --- End diff --
    
    👍 


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187735067
  
    --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java ---
    @@ -0,0 +1,144 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.api.sink;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Collection;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.kafka.connect.errors.ConnectException;
    +import org.apache.kafka.connect.sink.SinkRecord;
    +import org.apache.kafka.connect.sink.SinkTask;
    +import org.eclipse.rdf4j.model.Statement;
    +import org.eclipse.rdf4j.repository.sail.SailRepository;
    +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
    +import org.eclipse.rdf4j.sail.Sail;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.jcabi.manifests.Manifests;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +import edu.umd.cs.findbugs.annotations.Nullable;
    +
    +/**
    + * Handles the common components required to write {@link Statement}s to Rya.
    + * <p/>
    + * Implementations of this class only need to specify functionality that is specific to the
    + * Rya implementation.
    + */
    +@DefaultAnnotation(NonNull.class)
    --- End diff --
    
    "/**
     * Indicates that all members of the class or package should be annotated with the default value of the supplied
     * annotation class. This would be used for behavior annotations such as @NonNull, @CheckForNull,
     * or @CheckReturnValue. In particular, you can use @DefaultAnnotation(NonNull.class) on a class or package,
     * and then use @Nullable only on those parameters, methods or fields that you want to allow to be null.
     */"


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187734176
  
    --- Diff: extras/kafka.connect/accumulo/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfigTest.java ---
    @@ -0,0 +1,42 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.accumulo;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig;
    +import org.junit.Test;
    +
    +/**
    + * Unit tests the methods of {@link AccumuloRyaSinkConfig}.
    + */
    +public class AccumuloRyaSinkConfigTest {
    +
    +    @Test
    +    public void parses() {
    --- End diff --
    
    Could you give an example of a malformed field? Do you just mean fields that are not part of the schema? That's not illegal. They just get ignored.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187636404
  
    --- Diff: extras/kafka.connect/client/pom.xml ---
    @@ -0,0 +1,135 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one
    +  or more contributor license agreements.  See the NOTICE file
    +  distributed with this work for additional information
    +  regarding copyright ownership.  The ASF licenses this file
    +  to you under the Apache License, Version 2.0 (the
    +  "License"); you may not use this file except in compliance
    +  with the License.  You may obtain a copy of the License at
    +
    +    http://www.apache.org/licenses/LICENSE-2.0
    +
    +  Unless required by applicable law or agreed to in writing,
    +  software distributed under the License is distributed on an
    +  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +  KIND, either express or implied.  See the License for the
    +  specific language governing permissions and limitations
    +  under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" 
    +         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    +         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <modelVersion>4.0.0</modelVersion>
    +
    +    <parent>
    +        <groupId>org.apache.rya</groupId>
    +        <artifactId>rya.kafka.connect.parent</artifactId>
    +        <version>4.0.0-incubating-SNAPSHOT</version>
    +    </parent>
    +
    +    <artifactId>rya.kafka.connect.client</artifactId>
    +
    +    <name>Apache Rya Kafka Connect - Client</name>
    +    <description>Contains a client that may be used to load Statements into 
    +                 a Kafka topic to be read by Kafka Connect.</description>
    +
    +    <dependencies>
    +        <!-- 1st party dependencies. -->
    +        <dependency>
    +            <groupId>org.apache.rya</groupId>
    +            <artifactId>rya.sail</artifactId>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.rya</groupId>
    +            <artifactId>rya.kafka.connect.api</artifactId>
    +        </dependency>
    +    
    +        <!-- 3rd party dependencies. -->
    +        <dependency>
    +            <groupId>org.eclipse.rdf4j</groupId>
    +            <artifactId>rdf4j-model</artifactId>
    +        </dependency>
    +        <dependency>
    +            <groupId>com.google.guava</groupId>
    +            <artifactId>guava</artifactId>
    +        </dependency>
    +        <dependency>
    +            <groupId>com.beust</groupId>
    +            <artifactId>jcommander</artifactId>
    +        </dependency>
    +        <dependency>
    +            <groupId>com.github.stephenc.findbugs</groupId>
    +            <artifactId>findbugs-annotations</artifactId>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.kafka</groupId>
    +            <artifactId>kafka-clients</artifactId>
    +        </dependency>
    +        
    +        <!-- Statement formats we support for loading. -->
    --- End diff --
    
    I think pulling in rya.sail as a dependency will give you all the RDF Formats.  So, this probably isn't needed.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187729157
  
    --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java ---
    @@ -0,0 +1,111 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.accumulo;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Map;
    +
    +import org.apache.accumulo.core.client.AccumuloException;
    +import org.apache.accumulo.core.client.AccumuloSecurityException;
    +import org.apache.accumulo.core.client.Connector;
    +import org.apache.accumulo.core.client.Instance;
    +import org.apache.accumulo.core.client.ZooKeeperInstance;
    +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
    +import org.apache.kafka.connect.errors.ConnectException;
    +import org.apache.rya.accumulo.AccumuloRdfConfiguration;
    +import org.apache.rya.api.client.RyaClient;
    +import org.apache.rya.api.client.RyaClientException;
    +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
    +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
    +import org.apache.rya.api.persist.RyaDAOException;
    +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
    +import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
    +import org.apache.rya.sail.config.RyaSailFactory;
    +import org.eclipse.rdf4j.sail.Sail;
    +import org.eclipse.rdf4j.sail.SailException;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +
    +/**
    + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class AccumuloRyaSinkTask extends RyaSinkTask {
    +
    +    @Override
    +    protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException {
    +        requireNonNull(taskConfig);
    +
    +        // Parse the configuration object.
    +        final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
    +
    +        // Connect to the instance of Accumulo.
    +        final Connector connector;
    +        try {
    +            final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
    +            connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() ));
    +        } catch (AccumuloException | AccumuloSecurityException e) {
    --- End diff --
    
    Done.


---

[GitHub] incubator-rya issue #296: RYA-487 Kafka Connect Sinks

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/incubator-rya/pull/296
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/761/



---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187717175
  
    --- Diff: extras/kafka.connect/mongo-it/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTaskIT.java ---
    @@ -0,0 +1,97 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.mongo;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Optional;
    +
    +import org.apache.kafka.connect.errors.ConnectException;
    +import org.apache.rya.api.client.Install.InstallConfiguration;
    +import org.apache.rya.api.client.RyaClient;
    +import org.apache.rya.api.client.mongo.MongoConnectionDetails;
    +import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
    +import org.apache.rya.test.mongo.MongoITBase;
    +import org.junit.Test;
    +
    +/**
    + * Integration tests the methods of {@link MongoRyaSinkTask}.
    + */
    +public class MongoRyaSinkTaskIT extends MongoITBase {
    +
    +    @Test
    +    public void instanceExists() throws Exception {
    +        // Install an instance of Rya.
    +        final String ryaInstanceName = "rya";
    +        final MongoConnectionDetails connectionDetails = new MongoConnectionDetails(
    +                super.getMongoHostname(),
    +                super.getMongoPort(),
    +                Optional.empty(),
    +                Optional.empty());
    +
    +        final InstallConfiguration installConfig = InstallConfiguration.builder()
    +                .setEnableTableHashPrefix(false)
    +                .setEnableEntityCentricIndex(false)
    +                .setEnableFreeTextIndex(false)
    +                .setEnableTemporalIndex(false)
    +                .setEnablePcjIndex(false)
    +                .setEnableGeoIndex(false)
    +                .build();
    +
    +        final RyaClient ryaClient = MongoRyaClientFactory.build(connectionDetails, super.getMongoClient());
    +        ryaClient.getInstall().install(ryaInstanceName, installConfig);
    +
    +        // Create the task that will be tested.
    +        final MongoRyaSinkTask task = new MongoRyaSinkTask();
    +
    +        try {
    +            // Configure the task to use the embedded Mongo DB instance for Rya.
    +            final Map<String, String> config = new HashMap<>();
    +            config.put(MongoRyaSinkConfig.HOSTNAME, super.getMongoHostname());
    +            config.put(MongoRyaSinkConfig.PORT, "" + super.getMongoPort());
    +            config.put(MongoRyaSinkConfig.RYA_INSTANCE_NAME, "rya");
    +
    +            // This will pass because the Rya instance exists.
    +            task.start(config);
    +        } finally {
    +            task.stop();
    +        }
    +    }
    +
    +    @Test(expected = ConnectException.class)
    +    public void instanceDoesNotExist() throws Exception {
    +        // Create the task that will be tested.
    +        final MongoRyaSinkTask task = new MongoRyaSinkTask();
    +
    +        try {
    +            // Configure the task to use the embedded Mongo DB instance for Rya.
    +            final Map<String, String> config = new HashMap<>();
    +            config.put(MongoRyaSinkConfig.HOSTNAME, super.getMongoHostname());
    +            config.put(MongoRyaSinkConfig.PORT, "" + super.getMongoPort());
    +            config.put(MongoRyaSinkConfig.RYA_INSTANCE_NAME, "instance-does-not-exist");
    +
    +            // Starting the task will fail because the Rya instance does not exist.
    +            task.start(config);
    +        } finally {
    +            task.stop();
    +        }
    +    }
    +
    +    // TODO show that inserts using visibilities work.
    --- End diff --
    
    TODO?


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187733573
  
    --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java ---
    @@ -0,0 +1,63 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.accumulo;
    +
    +import java.util.Map;
    +
    +import org.apache.kafka.common.config.AbstractConfig;
    +import org.apache.kafka.common.config.ConfigDef;
    +import org.apache.kafka.connect.connector.Task;
    +import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +import edu.umd.cs.findbugs.annotations.Nullable;
    +
    +/**
    + * A {@link RyaSinkConnector} that uses an Accumulo Rya backend when creating tasks.
    + */
    +@DefaultAnnotation(NonNull.class)
    --- End diff --
    
    That field is nullable because this is a stateful object, but the parameters into the start(...) function may not be null. I'll add a null check there.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187732007
  
    --- Diff: dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java ---
    @@ -62,14 +62,14 @@ public AccumuloRdfConfiguration() {
             super();
         }
     
    -    public AccumuloRdfConfiguration(Configuration other) {
    +    public AccumuloRdfConfiguration(final Configuration other) {
             super(other);
         }
     
    -    public AccumuloRdfConfigurationBuilder getBuilder() {
    +    public static AccumuloRdfConfigurationBuilder getBuilder() {
    --- End diff --
    
    For me it is. I don't really want to refactor the entire method name in this review, though. I just needed it to be static so that I could use it.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187708244
  
    --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java ---
    @@ -0,0 +1,63 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.accumulo;
    +
    +import java.util.Map;
    +
    +import org.apache.kafka.common.config.AbstractConfig;
    +import org.apache.kafka.common.config.ConfigDef;
    +import org.apache.kafka.connect.connector.Task;
    +import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +import edu.umd.cs.findbugs.annotations.Nullable;
    +
    +/**
    + * A {@link RyaSinkConnector} that uses an Accumulo Rya backend when creating tasks.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class AccumuloRyaSinkConnector extends RyaSinkConnector {
    +
    +    @Nullable
    +    private AccumuloRyaSinkConfig config = null;
    +
    +    @Override
    +    public void start(final Map<String, String> props) {
    +        this.config = new AccumuloRyaSinkConfig( props );
    +    }
    +
    +    @Override
    +    protected AbstractConfig getConfig() {
    +        if(config == null) {
    +            throw new IllegalStateException("The configuration has not been set yet. Invoke start(props) first.");
    --- End diff --
    
    usually the doc'd function has the parameters be the type, not the name: start(Map)


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187698795
  
    --- Diff: extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java ---
    @@ -438,6 +438,9 @@ public static boolean getUsePcjUpdaterIndex(final Configuration conf) {
             return Optional.fromNullable(conf.get(FLUO_APP_NAME));
         }
     
    +    public static void setUseMongo(final Configuration conf, final boolean useMongo) {
    --- End diff --
    
    can you add this to the constructor of the MongoDbRDFConfiguration constructor?  if we're going to keep using this field, it would make sense for that to set it.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187739762
  
    --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java ---
    @@ -0,0 +1,77 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.api;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.kafka.common.serialization.Serializer;
    +import org.eclipse.rdf4j.model.Statement;
    +import org.eclipse.rdf4j.rio.RDFWriter;
    +import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +
    +/**
    + * A Kafka {@link Serializer} that is able to serialize a set of {@link Statement}s
    + * using the RDF4J Rio Binary format.
    --- End diff --
    
    fair enough, it just seemed like you are delegating the actual serialization to something else.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187739445
  
    --- Diff: extras/kafka.connect/accumulo/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfigTest.java ---
    @@ -0,0 +1,42 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.accumulo;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig;
    +import org.junit.Test;
    +
    +/**
    + * Unit tests the methods of {@link AccumuloRyaSinkConfig}.
    + */
    +public class AccumuloRyaSinkConfigTest {
    +
    +    @Test
    +    public void parses() {
    --- End diff --
    
    gotcha, the test just seemed lacking an error case


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187731454
  
    --- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java ---
    @@ -0,0 +1,122 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.mongo;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Arrays;
    +import java.util.Map;
    +import java.util.Optional;
    +
    +import org.apache.accumulo.core.client.AccumuloException;
    +import org.apache.accumulo.core.client.AccumuloSecurityException;
    +import org.apache.kafka.connect.errors.ConnectException;
    +import org.apache.rya.api.client.RyaClient;
    +import org.apache.rya.api.client.RyaClientException;
    +import org.apache.rya.api.client.mongo.MongoConnectionDetails;
    +import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
    +import org.apache.rya.api.persist.RyaDAOException;
    +import org.apache.rya.indexing.accumulo.ConfigUtils;
    +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
    +import org.apache.rya.mongodb.MongoDBRdfConfiguration;
    +import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
    +import org.apache.rya.sail.config.RyaSailFactory;
    +import org.eclipse.rdf4j.sail.Sail;
    +import org.eclipse.rdf4j.sail.SailException;
    +
    +import com.google.common.base.Strings;
    +import com.mongodb.MongoClient;
    +import com.mongodb.MongoCredential;
    +import com.mongodb.ServerAddress;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +import edu.umd.cs.findbugs.annotations.Nullable;
    +
    +/**
    + * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to store data.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class MongoRyaSinkTask extends RyaSinkTask {
    +
    +    @Override
    +    protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws IllegalStateException {
    +        requireNonNull(taskConfig);
    +
    +        // Parse the configuration object.
    +        final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig);
    +        @Nullable
    +        final String username = Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername();
    +        @Nullable
    +        final char[] password = Strings.isNullOrEmpty(config.getPassword()) ? null : config.getPassword().toCharArray();
    +
    +        // Connect a Mongo Client to the configured Mongo DB instance.
    +        final ServerAddress serverAddr = new ServerAddress(config.getHostname(), config.getPort());
    +        final boolean hasCredentials = username != null && password != null;
    +
    +        try(MongoClient mongoClient = hasCredentials ?
    +                new MongoClient(serverAddr, Arrays.asList(MongoCredential.createCredential(username, config.getRyaInstanceName(), password))) :
    +                new MongoClient(serverAddr)) {
    +            // Use a RyaClient to see if the configured instance exists.
    +            // Create the Mongo Connection Details that describe the Mongo DB Server we are interacting with.
    +            final MongoConnectionDetails connectionDetails = new MongoConnectionDetails(
    +                    config.getHostname(),
    +                    config.getPort(),
    +                    Optional.ofNullable(username),
    +                    Optional.ofNullable(password));
    +
    +            final RyaClient client = MongoRyaClientFactory.build(connectionDetails, mongoClient);
    +            if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
    +                throw new ConnectException("The Rya Instance named " +
    +                        config.getRyaInstanceName() + " has not been installed.");
    +            }
    +        } catch(final RyaClientException e) {
    +            throw new ConnectException("Unable to determine if the Rya Instance named " +
    +                    config.getRyaInstanceName() + " has been installed.", e);
    +        }
    +    }
    +
    +    @Override
    +    protected Sail makeSail(final Map<String, String> taskConfig) {
    +        requireNonNull(taskConfig);
    +
    +        // Parse the configuration object.
    +        final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig);
    +
    +        // Move the configuration into a Rya Configuration object.
    +        final MongoDBRdfConfiguration ryaConfig = new MongoDBRdfConfiguration();
    +        ConfigUtils.setUseMongo(ryaConfig, true);
    +        ryaConfig.setMongoDBName( config.getRyaInstanceName() );
    +        ryaConfig.setTablePrefix( config.getRyaInstanceName() );
    +        ryaConfig.setMongoHostname( config.getHostname() );
    +        ryaConfig.setMongoPort( "" + config.getPort() );
    +
    +        if(!Strings.isNullOrEmpty(config.getUsername()) && !Strings.isNullOrEmpty(config.getPassword())) {
    +            ryaConfig.setMongoUser( config.getUsername() );
    +            ryaConfig.setMongoPassword( config.getPassword() );
    +        }
    +
    +        // Create the Sail object.
    +        try {
    +            return RyaSailFactory.getInstance(ryaConfig);
    --- End diff --
    
    Added documentation to the future work section of the manual. Done.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187730101
  
    --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConnector.java ---
    @@ -0,0 +1,69 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.api.sink;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.kafka.common.config.AbstractConfig;
    +import org.apache.kafka.connect.sink.SinkConnector;
    +
    +import com.jcabi.manifests.Manifests;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +
    +/**
    + * Handles the common components required to task {@link RyaSinkTask}s that write to Rya.
    + * </p>
    + * Implementations of this class only need to specify functionality that is specific to the Rya implementation.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public abstract class RyaSinkConnector extends SinkConnector {
    +
    +    /**
    +     * Get the configuration that will be provided to the tasks when {@link #taskConfigs(int)} is invoked.
    +     * </p>
    +     * Only called after start has been invoked
    +     *
    +     * @return The configuration object for the connector.
    +     * @throws IllegalStateException Thrown if {@link SinkConnector#start(Map)} has not been invoked yet.
    +     */
    +    protected abstract AbstractConfig getConfig() throws IllegalStateException;
    +
    +    @Override
    +    public String version() {
    +        return Manifests.exists("Build-Version") ? Manifests.read("Build-Version") : "UNKNOWN";
    +    }
    +
    +    @Override
    +    public List<Map<String, String>> taskConfigs(final int maxTasks) {
    +        final List<Map<String, String>> configs = new ArrayList<>(maxTasks);
    +        for(int i = 0; i < maxTasks; i++) {
    +            configs.add( getConfig().originalsStrings() );
    +        }
    +        return configs;
    +    }
    +
    +    @Override
    +    public void stop() {
    +        // Nothing to do since the RyaSinkconnector has no background monitoring.
    --- End diff --
    
    Done.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187730515
  
    --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java ---
    @@ -0,0 +1,144 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.api.sink;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Collection;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.kafka.connect.errors.ConnectException;
    +import org.apache.kafka.connect.sink.SinkRecord;
    +import org.apache.kafka.connect.sink.SinkTask;
    +import org.eclipse.rdf4j.model.Statement;
    +import org.eclipse.rdf4j.repository.sail.SailRepository;
    +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
    +import org.eclipse.rdf4j.sail.Sail;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.jcabi.manifests.Manifests;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +import edu.umd.cs.findbugs.annotations.Nullable;
    +
    +/**
    + * Handles the common components required to write {@link Statement}s to Rya.
    + * <p/>
    + * Implementations of this class only need to specify functionality that is specific to the
    + * Rya implementation.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public abstract class RyaSinkTask extends SinkTask {
    +    private static final Logger log = LoggerFactory.getLogger(RyaSinkTask.class);
    +
    +    @Nullable
    +    private SailRepository sailRepo = null;
    +
    +    @Nullable
    +    private SailRepositoryConnection conn = null;
    +
    +    /**
    +     * Throws an exception if the configured Rya Instance is not already installed
    +     * within the configured database.
    +     *
    +     * @param taskConfig - The configuration values that were provided to the task. (not null)
    +     * @throws ConnectException The configured Rya Instance is not installed to the configured database
    +     *   or we were unable to figure out if it is installed.
    +     */
    +    protected abstract void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException;
    +
    +    /**
    +     * Creates an initialized {@link Sail} object that may be used to write {@link Statement}s to the configured
    +     * Rya Instance.
    +     *
    +     * @param taskConfig - Configures how the Sail object will be created. (not null)
    +     * @return The created Sail object.
    +     * @throws ConnectException The Sail object could not be made.
    +     */
    +    protected abstract Sail makeSail(final Map<String, String> taskConfig) throws ConnectException;
    +
    +    @Override
    +    public String version() {
    +        return Manifests.exists("Build-Version") ? Manifests.read("Build-Version"): "UNKNOWN";
    +    }
    +
    +    @Override
    +    public void start(final Map<String, String> props) throws ConnectException {
    +        requireNonNull(props);
    +
    +        // Ensure the configured Rya Instance is installed within the configured database.
    +        checkRyaInstanceExists(props);
    +
    +        // Create the Sail object that is connected to the Rya Instance.
    +        final Sail sail = makeSail(props);
    +        sailRepo = new SailRepository( sail );
    +        conn = sailRepo.getConnection();
    +    }
    +
    +    @Override
    +    public void put(final Collection<SinkRecord> records) {
    +        requireNonNull(records);
    +
    +        // Return immediately if there are no records to handle.
    +        if(records.isEmpty()) {
    +            return;
    +        }
    +
    +        // If a transaction has not been started yet, then start one.
    +        if(!conn.isActive()) {
    +            conn.begin();
    +        }
    +
    +        // Iterate through the records and write them to the Sail object.
    +        for(final SinkRecord record : records) {
    +            // If everything has been configured correctly, then the record's value will be a Set<Statement>.
    +            conn.add((Set<? extends Statement>) record.value());
    +        }
    +    }
    +
    +    @Override
    +    public void flush(final Map<TopicPartition, OffsetAndMetadata> currentOffets) {
    +        // Flush the current transaction.
    +        conn.commit();
    --- End diff --
    
    Done.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187634433
  
    --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java ---
    @@ -0,0 +1,144 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.api.sink;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Collection;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.kafka.connect.errors.ConnectException;
    +import org.apache.kafka.connect.sink.SinkRecord;
    +import org.apache.kafka.connect.sink.SinkTask;
    +import org.eclipse.rdf4j.model.Statement;
    +import org.eclipse.rdf4j.repository.sail.SailRepository;
    +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
    +import org.eclipse.rdf4j.sail.Sail;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.jcabi.manifests.Manifests;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +import edu.umd.cs.findbugs.annotations.Nullable;
    +
    +/**
    + * Handles the common components required to write {@link Statement}s to Rya.
    + * <p/>
    + * Implementations of this class only need to specify functionality that is specific to the
    + * Rya implementation.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public abstract class RyaSinkTask extends SinkTask {
    +    private static final Logger log = LoggerFactory.getLogger(RyaSinkTask.class);
    +
    +    @Nullable
    +    private SailRepository sailRepo = null;
    +
    +    @Nullable
    +    private SailRepositoryConnection conn = null;
    +
    +    /**
    +     * Throws an exception if the configured Rya Instance is not already installed
    +     * within the configured database.
    +     *
    +     * @param taskConfig - The configuration values that were provided to the task. (not null)
    +     * @throws ConnectException The configured Rya Instance is not installed to the configured database
    +     *   or we were unable to figure out if it is installed.
    +     */
    +    protected abstract void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException;
    +
    +    /**
    +     * Creates an initialized {@link Sail} object that may be used to write {@link Statement}s to the configured
    +     * Rya Instance.
    +     *
    +     * @param taskConfig - Configures how the Sail object will be created. (not null)
    +     * @return The created Sail object.
    +     * @throws ConnectException The Sail object could not be made.
    +     */
    +    protected abstract Sail makeSail(final Map<String, String> taskConfig) throws ConnectException;
    +
    +    @Override
    +    public String version() {
    +        return Manifests.exists("Build-Version") ? Manifests.read("Build-Version"): "UNKNOWN";
    +    }
    +
    +    @Override
    +    public void start(final Map<String, String> props) throws ConnectException {
    +        requireNonNull(props);
    +
    +        // Ensure the configured Rya Instance is installed within the configured database.
    +        checkRyaInstanceExists(props);
    +
    +        // Create the Sail object that is connected to the Rya Instance.
    +        final Sail sail = makeSail(props);
    +        sailRepo = new SailRepository( sail );
    +        conn = sailRepo.getConnection();
    +    }
    +
    +    @Override
    +    public void put(final Collection<SinkRecord> records) {
    +        requireNonNull(records);
    +
    +        // Return immediately if there are no records to handle.
    +        if(records.isEmpty()) {
    +            return;
    +        }
    +
    +        // If a transaction has not been started yet, then start one.
    +        if(!conn.isActive()) {
    +            conn.begin();
    +        }
    +
    +        // Iterate through the records and write them to the Sail object.
    +        for(final SinkRecord record : records) {
    +            // If everything has been configured correctly, then the record's value will be a Set<Statement>.
    +            conn.add((Set<? extends Statement>) record.value());
    +        }
    +    }
    +
    +    @Override
    +    public void flush(final Map<TopicPartition, OffsetAndMetadata> currentOffets) {
    +        // Flush the current transaction.
    +        conn.commit();
    --- End diff --
    
    add a null check


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187731377
  
    --- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java ---
    @@ -0,0 +1,122 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.mongo;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Arrays;
    +import java.util.Map;
    +import java.util.Optional;
    +
    +import org.apache.accumulo.core.client.AccumuloException;
    +import org.apache.accumulo.core.client.AccumuloSecurityException;
    +import org.apache.kafka.connect.errors.ConnectException;
    +import org.apache.rya.api.client.RyaClient;
    +import org.apache.rya.api.client.RyaClientException;
    +import org.apache.rya.api.client.mongo.MongoConnectionDetails;
    +import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
    +import org.apache.rya.api.persist.RyaDAOException;
    +import org.apache.rya.indexing.accumulo.ConfigUtils;
    +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
    +import org.apache.rya.mongodb.MongoDBRdfConfiguration;
    +import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
    +import org.apache.rya.sail.config.RyaSailFactory;
    +import org.eclipse.rdf4j.sail.Sail;
    +import org.eclipse.rdf4j.sail.SailException;
    +
    +import com.google.common.base.Strings;
    +import com.mongodb.MongoClient;
    +import com.mongodb.MongoCredential;
    +import com.mongodb.ServerAddress;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +import edu.umd.cs.findbugs.annotations.Nullable;
    +
    +/**
    + * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to store data.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class MongoRyaSinkTask extends RyaSinkTask {
    +
    +    @Override
    +    protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws IllegalStateException {
    +        requireNonNull(taskConfig);
    +
    +        // Parse the configuration object.
    +        final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig);
    +        @Nullable
    +        final String username = Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername();
    +        @Nullable
    +        final char[] password = Strings.isNullOrEmpty(config.getPassword()) ? null : config.getPassword().toCharArray();
    +
    +        // Connect a Mongo Client to the configured Mongo DB instance.
    +        final ServerAddress serverAddr = new ServerAddress(config.getHostname(), config.getPort());
    +        final boolean hasCredentials = username != null && password != null;
    +
    +        try(MongoClient mongoClient = hasCredentials ?
    +                new MongoClient(serverAddr, Arrays.asList(MongoCredential.createCredential(username, config.getRyaInstanceName(), password))) :
    +                new MongoClient(serverAddr)) {
    +            // Use a RyaClient to see if the configured instance exists.
    +            // Create the Mongo Connection Details that describe the Mongo DB Server we are interacting with.
    +            final MongoConnectionDetails connectionDetails = new MongoConnectionDetails(
    +                    config.getHostname(),
    +                    config.getPort(),
    +                    Optional.ofNullable(username),
    +                    Optional.ofNullable(password));
    +
    +            final RyaClient client = MongoRyaClientFactory.build(connectionDetails, mongoClient);
    +            if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
    +                throw new ConnectException("The Rya Instance named " +
    +                        config.getRyaInstanceName() + " has not been installed.");
    --- End diff --
    
    Done.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187736039
  
    --- Diff: extras/kafka.connect/mongo-it/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTaskIT.java ---
    @@ -0,0 +1,97 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.mongo;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Optional;
    +
    +import org.apache.kafka.connect.errors.ConnectException;
    +import org.apache.rya.api.client.Install.InstallConfiguration;
    +import org.apache.rya.api.client.RyaClient;
    +import org.apache.rya.api.client.mongo.MongoConnectionDetails;
    +import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
    +import org.apache.rya.test.mongo.MongoITBase;
    +import org.junit.Test;
    +
    +/**
    + * Integration tests the methods of {@link MongoRyaSinkTask}.
    + */
    +public class MongoRyaSinkTaskIT extends MongoITBase {
    +
    +    @Test
    +    public void instanceExists() throws Exception {
    +        // Install an instance of Rya.
    +        final String ryaInstanceName = "rya";
    +        final MongoConnectionDetails connectionDetails = new MongoConnectionDetails(
    +                super.getMongoHostname(),
    +                super.getMongoPort(),
    +                Optional.empty(),
    +                Optional.empty());
    +
    +        final InstallConfiguration installConfig = InstallConfiguration.builder()
    +                .setEnableTableHashPrefix(false)
    +                .setEnableEntityCentricIndex(false)
    +                .setEnableFreeTextIndex(false)
    +                .setEnableTemporalIndex(false)
    +                .setEnablePcjIndex(false)
    +                .setEnableGeoIndex(false)
    +                .build();
    +
    +        final RyaClient ryaClient = MongoRyaClientFactory.build(connectionDetails, super.getMongoClient());
    +        ryaClient.getInstall().install(ryaInstanceName, installConfig);
    +
    +        // Create the task that will be tested.
    +        final MongoRyaSinkTask task = new MongoRyaSinkTask();
    +
    +        try {
    +            // Configure the task to use the embedded Mongo DB instance for Rya.
    +            final Map<String, String> config = new HashMap<>();
    +            config.put(MongoRyaSinkConfig.HOSTNAME, super.getMongoHostname());
    +            config.put(MongoRyaSinkConfig.PORT, "" + super.getMongoPort());
    +            config.put(MongoRyaSinkConfig.RYA_INSTANCE_NAME, "rya");
    +
    +            // This will pass because the Rya instance exists.
    +            task.start(config);
    +        } finally {
    +            task.stop();
    +        }
    +    }
    +
    +    @Test(expected = ConnectException.class)
    +    public void instanceDoesNotExist() throws Exception {
    +        // Create the task that will be tested.
    +        final MongoRyaSinkTask task = new MongoRyaSinkTask();
    +
    +        try {
    +            // Configure the task to use the embedded Mongo DB instance for Rya.
    +            final Map<String, String> config = new HashMap<>();
    +            config.put(MongoRyaSinkConfig.HOSTNAME, super.getMongoHostname());
    +            config.put(MongoRyaSinkConfig.PORT, "" + super.getMongoPort());
    +            config.put(MongoRyaSinkConfig.RYA_INSTANCE_NAME, "instance-does-not-exist");
    +
    +            // Starting the task will fail because the Rya instance does not exist.
    +            task.start(config);
    +        } finally {
    +            task.stop();
    +        }
    +    }
    +
    +    // TODO show that inserts using visibilities work.
    --- End diff --
    
    Oh yea, we're not supporting that. Lemme delete that comment. 


---

[GitHub] incubator-rya issue #296: RYA-487 Kafka Connect Sinks

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/incubator-rya/pull/296
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/762/



---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187631961
  
    --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java ---
    @@ -0,0 +1,111 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.accumulo;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Map;
    +
    +import org.apache.accumulo.core.client.AccumuloException;
    +import org.apache.accumulo.core.client.AccumuloSecurityException;
    +import org.apache.accumulo.core.client.Connector;
    +import org.apache.accumulo.core.client.Instance;
    +import org.apache.accumulo.core.client.ZooKeeperInstance;
    +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
    +import org.apache.kafka.connect.errors.ConnectException;
    +import org.apache.rya.accumulo.AccumuloRdfConfiguration;
    +import org.apache.rya.api.client.RyaClient;
    +import org.apache.rya.api.client.RyaClientException;
    +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
    +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
    +import org.apache.rya.api.persist.RyaDAOException;
    +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
    +import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
    +import org.apache.rya.sail.config.RyaSailFactory;
    +import org.eclipse.rdf4j.sail.Sail;
    +import org.eclipse.rdf4j.sail.SailException;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +
    +/**
    + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class AccumuloRyaSinkTask extends RyaSinkTask {
    +
    +    @Override
    +    protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException {
    +        requireNonNull(taskConfig);
    +
    +        // Parse the configuration object.
    +        final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
    +
    +        // Connect to the instance of Accumulo.
    +        final Connector connector;
    +        try {
    +            final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
    +            connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() ));
    +        } catch (AccumuloException | AccumuloSecurityException e) {
    +            throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e);
    +        }
    +
    +        // Use a RyaClient to see if the configured instance exists.
    +        try {
    +            final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
    +                    config.getUsername(),
    +                    config.getPassword().toCharArray(),
    +                    config.getClusterName(),
    +                    config.getZookeepers());
    +            final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector);
    +
    +            if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
    +                throw new ConnectException("The Rya Instance named " +
    +                        config.getRyaInstanceName() + " has not been installed.");
    +            }
    +
    +        } catch (final RyaClientException e) {
    +            throw new ConnectException("Unable to determine if the Rya Instance named " +
    +                    config.getRyaInstanceName() + " has been installed.", e);
    +        }
    +    }
    +
    +    @Override
    +    protected Sail makeSail(final Map<String, String> taskConfig) throws ConnectException {
    +        requireNonNull(taskConfig);
    +
    +        // Parse the configuration object.
    +        final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
    +
    +        // Move the configuration into a Rya Configuration object.
    +        final AccumuloRdfConfiguration ryaConfig = new AccumuloRdfConfiguration();
    +        ryaConfig.setTablePrefix( config.getRyaInstanceName() );
    +        ryaConfig.setAccumuloZookeepers( config.getZookeepers() );
    +        ryaConfig.setAccumuloInstance( config.getClusterName() );
    +        ryaConfig.setAccumuloUser( config.getUsername() );
    +        ryaConfig.setAccumuloPassword( config.getPassword() );
    +
    +        // Create the Sail object.
    +        try {
    +            return RyaSailFactory.getInstance(ryaConfig);
    --- End diff --
    
    This should support geo as well:
    ```
    if (OptionalConfigUtils.getUseGeo(ryaConfig)) {
        return GeoRyaSailFactory.getInstance(ryaConfig);
    } else {
        return RyaSailFactory.getInstance(ryaConfig);
    }
    ```
    We really need a better mechanism than this.  Maybe someday we'll make plugins for different Sails.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187739297
  
    --- Diff: extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java ---
    @@ -438,6 +438,9 @@ public static boolean getUsePcjUpdaterIndex(final Configuration conf) {
             return Optional.fromNullable(conf.get(FLUO_APP_NAME));
         }
     
    +    public static void setUseMongo(final Configuration conf, final boolean useMongo) {
    --- End diff --
    
    sure makes sense


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187728842
  
    --- Diff: extras/kafka.connect/accumulo-it/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTaskIT.java ---
    @@ -0,0 +1,100 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.accumulo;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import org.apache.kafka.connect.errors.ConnectException;
    +import org.apache.rya.api.client.Install.InstallConfiguration;
    +import org.apache.rya.api.client.RyaClient;
    +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
    +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
    +import org.apache.rya.test.accumulo.AccumuloITBase;
    +import org.junit.Test;
    +
    +/**
    + * Integration tests the methods of {@link AccumuloRyaSinkTask}.
    --- End diff --
    
    Done.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187699543
  
    --- Diff: extras/kafka.connect/README.md ---
    @@ -0,0 +1,22 @@
    +<!-- Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License. -->
    +
    +The parent project for all Rya Kafka Connect work. All projects thare are part 
    +of that system must use this project's pom as their parent pom.
    --- End diff --
    
    typo  All projects that* are....


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187632758
  
    --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsDeserializer.java ---
    @@ -0,0 +1,92 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.api;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.IOException;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.kafka.common.serialization.Deserializer;
    +import org.eclipse.rdf4j.model.Statement;
    +import org.eclipse.rdf4j.rio.RDFHandler;
    +import org.eclipse.rdf4j.rio.RDFHandlerException;
    +import org.eclipse.rdf4j.rio.RDFParseException;
    +import org.eclipse.rdf4j.rio.RDFParser;
    +import org.eclipse.rdf4j.rio.binary.BinaryRDFParserFactory;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +
    +/**
    + * A Kafka {@link Deserializer} that is able to deserialize an RDF4J Rio Binary format serialized
    + * set of {@link Statement}s.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class StatementsDeserializer implements Deserializer<Set<Statement>> {
    +    private static final Logger log = LoggerFactory.getLogger(StatementsDeserializer.class);
    +
    +    private static final BinaryRDFParserFactory PARSER_FACTORY = new BinaryRDFParserFactory();
    +
    +    @Override
    +    public void configure(final Map<String, ?> configs, final boolean isKey) {
    +        // Nothing to do.
    +    }
    +
    +    @Override
    +    public Set<Statement> deserialize(final String topic, final byte[] data) {
    +        if(data == null || data.length == 0) {
    +            // Return null because that is the contract of this method.
    +            return null;
    +        }
    +
    +        try {
    +            final RDFParser parser = PARSER_FACTORY.getParser();
    +            final Set<Statement> statements = new HashSet<>();
    +
    +            parser.setRDFHandler(new RDFHandler() {
    +                @Override
    +                public void handleStatement(final Statement statement) throws RDFHandlerException {
    +                    log.debug("Statement: " + statement);
    +                    statements.add( statement );
    +                }
    +
    +                @Override public void startRDF() throws RDFHandlerException { }
    +                @Override public void handleNamespace(final String arg0, final String arg1) throws RDFHandlerException { }
    +                @Override public void handleComment(final String arg0) throws RDFHandlerException { }
    +                @Override public void endRDF() throws RDFHandlerException { }
    +            });
    +
    +            parser.parse(new ByteArrayInputStream(data), null);
    +            return statements;
    +
    +        } catch(final RDFParseException | RDFHandlerException | IOException e) {
    +            log.error("Could not deserialize a Set of VisibilityStatement objects using the RDF4J Rio Binray format.", e);
    --- End diff --
    
    typo. Binary


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187657023
  
    --- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java ---
    @@ -0,0 +1,122 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.mongo;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Arrays;
    +import java.util.Map;
    +import java.util.Optional;
    +
    +import org.apache.accumulo.core.client.AccumuloException;
    +import org.apache.accumulo.core.client.AccumuloSecurityException;
    +import org.apache.kafka.connect.errors.ConnectException;
    +import org.apache.rya.api.client.RyaClient;
    +import org.apache.rya.api.client.RyaClientException;
    +import org.apache.rya.api.client.mongo.MongoConnectionDetails;
    +import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
    +import org.apache.rya.api.persist.RyaDAOException;
    +import org.apache.rya.indexing.accumulo.ConfigUtils;
    +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
    +import org.apache.rya.mongodb.MongoDBRdfConfiguration;
    +import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
    +import org.apache.rya.sail.config.RyaSailFactory;
    +import org.eclipse.rdf4j.sail.Sail;
    +import org.eclipse.rdf4j.sail.SailException;
    +
    +import com.google.common.base.Strings;
    +import com.mongodb.MongoClient;
    +import com.mongodb.MongoCredential;
    +import com.mongodb.ServerAddress;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +import edu.umd.cs.findbugs.annotations.Nullable;
    +
    +/**
    + * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to store data.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class MongoRyaSinkTask extends RyaSinkTask {
    +
    +    @Override
    +    protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws IllegalStateException {
    +        requireNonNull(taskConfig);
    +
    +        // Parse the configuration object.
    +        final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig);
    +        @Nullable
    +        final String username = Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername();
    +        @Nullable
    +        final char[] password = Strings.isNullOrEmpty(config.getPassword()) ? null : config.getPassword().toCharArray();
    +
    +        // Connect a Mongo Client to the configured Mongo DB instance.
    +        final ServerAddress serverAddr = new ServerAddress(config.getHostname(), config.getPort());
    +        final boolean hasCredentials = username != null && password != null;
    +
    +        try(MongoClient mongoClient = hasCredentials ?
    +                new MongoClient(serverAddr, Arrays.asList(MongoCredential.createCredential(username, config.getRyaInstanceName(), password))) :
    +                new MongoClient(serverAddr)) {
    +            // Use a RyaClient to see if the configured instance exists.
    +            // Create the Mongo Connection Details that describe the Mongo DB Server we are interacting with.
    +            final MongoConnectionDetails connectionDetails = new MongoConnectionDetails(
    +                    config.getHostname(),
    +                    config.getPort(),
    +                    Optional.ofNullable(username),
    +                    Optional.ofNullable(password));
    +
    +            final RyaClient client = MongoRyaClientFactory.build(connectionDetails, mongoClient);
    +            if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
    +                throw new ConnectException("The Rya Instance named " +
    +                        config.getRyaInstanceName() + " has not been installed.");
    +            }
    +        } catch(final RyaClientException e) {
    +            throw new ConnectException("Unable to determine if the Rya Instance named " +
    +                    config.getRyaInstanceName() + " has been installed.", e);
    +        }
    +    }
    +
    +    @Override
    +    protected Sail makeSail(final Map<String, String> taskConfig) {
    +        requireNonNull(taskConfig);
    +
    +        // Parse the configuration object.
    +        final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig);
    +
    +        // Move the configuration into a Rya Configuration object.
    +        final MongoDBRdfConfiguration ryaConfig = new MongoDBRdfConfiguration();
    +        ConfigUtils.setUseMongo(ryaConfig, true);
    +        ryaConfig.setMongoDBName( config.getRyaInstanceName() );
    +        ryaConfig.setTablePrefix( config.getRyaInstanceName() );
    +        ryaConfig.setMongoHostname( config.getHostname() );
    +        ryaConfig.setMongoPort( "" + config.getPort() );
    +
    +        if(!Strings.isNullOrEmpty(config.getUsername()) && !Strings.isNullOrEmpty(config.getPassword())) {
    +            ryaConfig.setMongoUser( config.getUsername() );
    +            ryaConfig.setMongoPassword( config.getPassword() );
    +        }
    +
    +        // Create the Sail object.
    +        try {
    +            return RyaSailFactory.getInstance(ryaConfig);
    --- End diff --
    
    Could you be more specific? Do you mean add a geo profile that bundles the geo jars into this project?


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187736164
  
    --- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfig.java ---
    @@ -0,0 +1,94 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.mongo;
    +
    +import java.util.Map;
    +
    +import org.apache.kafka.common.config.ConfigDef;
    +import org.apache.kafka.common.config.ConfigDef.Importance;
    +import org.apache.kafka.common.config.ConfigDef.Type;
    +import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +
    +/**
    + * A Kafka Connect configuration that is used to configure {@link MongoRyaSinkConnector}s and {@link MongoRyaSinkTask}s.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class MongoRyaSinkConfig extends RyaSinkConfig {
    +
    +    public static final String HOSTNAME = "mongo.hostname";
    +    private static final String HOSTNAME_DOC = "The Mongo DB hostname the Sail connections wlll use.";
    --- End diff --
    
    Done.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187667486
  
    --- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java ---
    @@ -0,0 +1,122 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.mongo;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Arrays;
    +import java.util.Map;
    +import java.util.Optional;
    +
    +import org.apache.accumulo.core.client.AccumuloException;
    +import org.apache.accumulo.core.client.AccumuloSecurityException;
    +import org.apache.kafka.connect.errors.ConnectException;
    +import org.apache.rya.api.client.RyaClient;
    +import org.apache.rya.api.client.RyaClientException;
    +import org.apache.rya.api.client.mongo.MongoConnectionDetails;
    +import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
    +import org.apache.rya.api.persist.RyaDAOException;
    +import org.apache.rya.indexing.accumulo.ConfigUtils;
    +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
    +import org.apache.rya.mongodb.MongoDBRdfConfiguration;
    +import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
    +import org.apache.rya.sail.config.RyaSailFactory;
    +import org.eclipse.rdf4j.sail.Sail;
    +import org.eclipse.rdf4j.sail.SailException;
    +
    +import com.google.common.base.Strings;
    +import com.mongodb.MongoClient;
    +import com.mongodb.MongoCredential;
    +import com.mongodb.ServerAddress;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +import edu.umd.cs.findbugs.annotations.Nullable;
    +
    +/**
    + * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to store data.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class MongoRyaSinkTask extends RyaSinkTask {
    +
    +    @Override
    +    protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws IllegalStateException {
    +        requireNonNull(taskConfig);
    +
    +        // Parse the configuration object.
    +        final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig);
    +        @Nullable
    +        final String username = Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername();
    +        @Nullable
    +        final char[] password = Strings.isNullOrEmpty(config.getPassword()) ? null : config.getPassword().toCharArray();
    +
    +        // Connect a Mongo Client to the configured Mongo DB instance.
    +        final ServerAddress serverAddr = new ServerAddress(config.getHostname(), config.getPort());
    +        final boolean hasCredentials = username != null && password != null;
    +
    +        try(MongoClient mongoClient = hasCredentials ?
    +                new MongoClient(serverAddr, Arrays.asList(MongoCredential.createCredential(username, config.getRyaInstanceName(), password))) :
    +                new MongoClient(serverAddr)) {
    +            // Use a RyaClient to see if the configured instance exists.
    +            // Create the Mongo Connection Details that describe the Mongo DB Server we are interacting with.
    +            final MongoConnectionDetails connectionDetails = new MongoConnectionDetails(
    +                    config.getHostname(),
    +                    config.getPort(),
    +                    Optional.ofNullable(username),
    +                    Optional.ofNullable(password));
    +
    +            final RyaClient client = MongoRyaClientFactory.build(connectionDetails, mongoClient);
    +            if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
    +                throw new ConnectException("The Rya Instance named " +
    +                        config.getRyaInstanceName() + " has not been installed.");
    +            }
    +        } catch(final RyaClientException e) {
    +            throw new ConnectException("Unable to determine if the Rya Instance named " +
    +                    config.getRyaInstanceName() + " has been installed.", e);
    +        }
    +    }
    +
    +    @Override
    +    protected Sail makeSail(final Map<String, String> taskConfig) {
    +        requireNonNull(taskConfig);
    +
    +        // Parse the configuration object.
    +        final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig);
    +
    +        // Move the configuration into a Rya Configuration object.
    +        final MongoDBRdfConfiguration ryaConfig = new MongoDBRdfConfiguration();
    +        ConfigUtils.setUseMongo(ryaConfig, true);
    +        ryaConfig.setMongoDBName( config.getRyaInstanceName() );
    +        ryaConfig.setTablePrefix( config.getRyaInstanceName() );
    +        ryaConfig.setMongoHostname( config.getHostname() );
    +        ryaConfig.setMongoPort( "" + config.getPort() );
    +
    +        if(!Strings.isNullOrEmpty(config.getUsername()) && !Strings.isNullOrEmpty(config.getPassword())) {
    +            ryaConfig.setMongoUser( config.getUsername() );
    +            ryaConfig.setMongoPassword( config.getPassword() );
    +        }
    +
    +        // Create the Sail object.
    +        try {
    +            return RyaSailFactory.getInstance(ryaConfig);
    --- End diff --
    
    We can ignore geo here for now.  See comment up above.


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187628752
  
    --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java ---
    @@ -0,0 +1,111 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.accumulo;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Map;
    +
    +import org.apache.accumulo.core.client.AccumuloException;
    +import org.apache.accumulo.core.client.AccumuloSecurityException;
    +import org.apache.accumulo.core.client.Connector;
    +import org.apache.accumulo.core.client.Instance;
    +import org.apache.accumulo.core.client.ZooKeeperInstance;
    +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
    +import org.apache.kafka.connect.errors.ConnectException;
    +import org.apache.rya.accumulo.AccumuloRdfConfiguration;
    +import org.apache.rya.api.client.RyaClient;
    +import org.apache.rya.api.client.RyaClientException;
    +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
    +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
    +import org.apache.rya.api.persist.RyaDAOException;
    +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
    +import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
    +import org.apache.rya.sail.config.RyaSailFactory;
    +import org.eclipse.rdf4j.sail.Sail;
    +import org.eclipse.rdf4j.sail.SailException;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +
    +/**
    + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class AccumuloRyaSinkTask extends RyaSinkTask {
    +
    +    @Override
    +    protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException {
    +        requireNonNull(taskConfig);
    +
    +        // Parse the configuration object.
    +        final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
    +
    +        // Connect to the instance of Accumulo.
    +        final Connector connector;
    +        try {
    +            final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
    +            connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() ));
    +        } catch (AccumuloException | AccumuloSecurityException e) {
    --- End diff --
    
    final


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187739254
  
    --- Diff: dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java ---
    @@ -62,14 +62,14 @@ public AccumuloRdfConfiguration() {
             super();
         }
     
    -    public AccumuloRdfConfiguration(Configuration other) {
    +    public AccumuloRdfConfiguration(final Configuration other) {
             super(other);
         }
     
    -    public AccumuloRdfConfigurationBuilder getBuilder() {
    +    public static AccumuloRdfConfigurationBuilder getBuilder() {
    --- End diff --
    
    👍 


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187638302
  
    --- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java ---
    @@ -0,0 +1,122 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.mongo;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Arrays;
    +import java.util.Map;
    +import java.util.Optional;
    +
    +import org.apache.accumulo.core.client.AccumuloException;
    +import org.apache.accumulo.core.client.AccumuloSecurityException;
    +import org.apache.kafka.connect.errors.ConnectException;
    +import org.apache.rya.api.client.RyaClient;
    +import org.apache.rya.api.client.RyaClientException;
    +import org.apache.rya.api.client.mongo.MongoConnectionDetails;
    +import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
    +import org.apache.rya.api.persist.RyaDAOException;
    +import org.apache.rya.indexing.accumulo.ConfigUtils;
    +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
    +import org.apache.rya.mongodb.MongoDBRdfConfiguration;
    +import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
    +import org.apache.rya.sail.config.RyaSailFactory;
    +import org.eclipse.rdf4j.sail.Sail;
    +import org.eclipse.rdf4j.sail.SailException;
    +
    +import com.google.common.base.Strings;
    +import com.mongodb.MongoClient;
    +import com.mongodb.MongoCredential;
    +import com.mongodb.ServerAddress;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +import edu.umd.cs.findbugs.annotations.Nullable;
    +
    +/**
    + * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to store data.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class MongoRyaSinkTask extends RyaSinkTask {
    +
    +    @Override
    +    protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws IllegalStateException {
    +        requireNonNull(taskConfig);
    +
    +        // Parse the configuration object.
    +        final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig);
    +        @Nullable
    +        final String username = Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername();
    +        @Nullable
    +        final char[] password = Strings.isNullOrEmpty(config.getPassword()) ? null : config.getPassword().toCharArray();
    +
    +        // Connect a Mongo Client to the configured Mongo DB instance.
    +        final ServerAddress serverAddr = new ServerAddress(config.getHostname(), config.getPort());
    +        final boolean hasCredentials = username != null && password != null;
    +
    +        try(MongoClient mongoClient = hasCredentials ?
    +                new MongoClient(serverAddr, Arrays.asList(MongoCredential.createCredential(username, config.getRyaInstanceName(), password))) :
    +                new MongoClient(serverAddr)) {
    +            // Use a RyaClient to see if the configured instance exists.
    +            // Create the Mongo Connection Details that describe the Mongo DB Server we are interacting with.
    +            final MongoConnectionDetails connectionDetails = new MongoConnectionDetails(
    +                    config.getHostname(),
    +                    config.getPort(),
    +                    Optional.ofNullable(username),
    +                    Optional.ofNullable(password));
    +
    +            final RyaClient client = MongoRyaClientFactory.build(connectionDetails, mongoClient);
    +            if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
    +                throw new ConnectException("The Rya Instance named " +
    +                        config.getRyaInstanceName() + " has not been installed.");
    --- End diff --
    
    LogUtils.clean(config.getRyaInstanceName())


---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187734448
  
    --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java ---
    @@ -0,0 +1,77 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.api;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.kafka.common.serialization.Serializer;
    +import org.eclipse.rdf4j.model.Statement;
    +import org.eclipse.rdf4j.rio.RDFWriter;
    +import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +
    +/**
    + * A Kafka {@link Serializer} that is able to serialize a set of {@link Statement}s
    + * using the RDF4J Rio Binary format.
    --- End diff --
    
    I think saying I'm using the RDF4J Rio Binary format is more useful than indicating how I went about doing that since that's what the code is.


---

[GitHub] incubator-rya issue #296: RYA-487 Kafka Connect Sinks

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/incubator-rya/pull/296
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/757/<h2>Build result: FAILURE</span></h2>[...truncated 91.94 MB...][INFO] Apache Rya Web Projects ............................ SKIPPED[INFO] Apache Rya Web Implementation ...................... SKIPPED[INFO] ------------------------------------------------------------------------[INFO] BUILD FAILURE[INFO] ------------------------------------------------------------------------[INFO] Total time: 46:04 min[INFO] Finished at: 2018-05-10T18:10:41+00:00[INFO] Final Memory: 519M/2971M[INFO] ------------------------------------------------------------------------Waiting for Jenkins to finish collecting data[ERROR] Failed to execute goal org.apache.rat:apache-rat-plugin:0.11:check (check-licenses) on project rya.kafka.connect.parent: Too many files with unapproved license: 1 See RAT report in: /home/jenkins/jenkins-slave/workspace/incubator-rya-master-with-optionals-pull-requests/extras/kafka.connect/target/r
 at.txt -> [Help 1][ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.[ERROR] Re-run Maven using the -X switch to enable full debug logging.[ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles:[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException[ERROR] [ERROR] After correcting the problems, you can resume the build with the command[ERROR]   mvn <goals> -rf :rya.kafka.connect.parentchannel stoppedSetting status of c2d802bd51d4d6951aa54e48b4038d64462bd8fd to FAILURE with url https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/757/ and message: 'FAILURE 'Using context: Jenkins: clean package -Pgeoindexing



---

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/296#discussion_r187727065
  
    --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java ---
    @@ -0,0 +1,111 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.kafka.connect.accumulo;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Map;
    +
    +import org.apache.accumulo.core.client.AccumuloException;
    +import org.apache.accumulo.core.client.AccumuloSecurityException;
    +import org.apache.accumulo.core.client.Connector;
    +import org.apache.accumulo.core.client.Instance;
    +import org.apache.accumulo.core.client.ZooKeeperInstance;
    +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
    +import org.apache.kafka.connect.errors.ConnectException;
    +import org.apache.rya.accumulo.AccumuloRdfConfiguration;
    +import org.apache.rya.api.client.RyaClient;
    +import org.apache.rya.api.client.RyaClientException;
    +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
    +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
    +import org.apache.rya.api.persist.RyaDAOException;
    +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
    +import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
    +import org.apache.rya.sail.config.RyaSailFactory;
    +import org.eclipse.rdf4j.sail.Sail;
    +import org.eclipse.rdf4j.sail.SailException;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +
    +/**
    + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class AccumuloRyaSinkTask extends RyaSinkTask {
    +
    +    @Override
    +    protected void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException {
    +        requireNonNull(taskConfig);
    +
    +        // Parse the configuration object.
    +        final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
    +
    +        // Connect to the instance of Accumulo.
    +        final Connector connector;
    +        try {
    +            final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
    +            connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() ));
    +        } catch (AccumuloException | AccumuloSecurityException e) {
    +            throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e);
    +        }
    +
    +        // Use a RyaClient to see if the configured instance exists.
    +        try {
    +            final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
    +                    config.getUsername(),
    +                    config.getPassword().toCharArray(),
    +                    config.getClusterName(),
    +                    config.getZookeepers());
    +            final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector);
    +
    +            if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) {
    +                throw new ConnectException("The Rya Instance named " +
    +                        config.getRyaInstanceName() + " has not been installed.");
    +            }
    +
    +        } catch (final RyaClientException e) {
    +            throw new ConnectException("Unable to determine if the Rya Instance named " +
    +                    config.getRyaInstanceName() + " has been installed.", e);
    +        }
    +    }
    +
    +    @Override
    +    protected Sail makeSail(final Map<String, String> taskConfig) throws ConnectException {
    +        requireNonNull(taskConfig);
    +
    +        // Parse the configuration object.
    +        final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig);
    +
    +        // Move the configuration into a Rya Configuration object.
    +        final AccumuloRdfConfiguration ryaConfig = new AccumuloRdfConfiguration();
    +        ryaConfig.setTablePrefix( config.getRyaInstanceName() );
    +        ryaConfig.setAccumuloZookeepers( config.getZookeepers() );
    +        ryaConfig.setAccumuloInstance( config.getClusterName() );
    +        ryaConfig.setAccumuloUser( config.getUsername() );
    +        ryaConfig.setAccumuloPassword( config.getPassword() );
    +
    +        // Create the Sail object.
    +        try {
    +            return RyaSailFactory.getInstance(ryaConfig);
    --- End diff --
    
    I'll add that to the future work section of the manual.


---