You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/11/07 15:50:44 UTC
[5/5] incubator-rya git commit: RYA-356 Added a Twill App for running
the periodic service. Closes #248.
RYA-356 Added a Twill App for running the periodic service. Closes #248.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/8acd24b5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/8acd24b5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/8acd24b5
Branch: refs/heads/master
Commit: 8acd24b5ec477f7943453e74d753dab03be99352
Parents: b372ebc
Author: jdasch <hc...@gmail.com>
Authored: Thu Sep 7 16:57:13 2017 -0400
Committer: Caleb Meier <ca...@parsons.com>
Committed: Tue Nov 7 07:49:12 2017 -0800
----------------------------------------------------------------------
common/rya.api/pom.xml | 10 +-
.../apache/rya/api/client/LoadStatements.java | 41 ++
.../org/apache/rya/api/client/RyaClient.java | 18 +-
.../org/apache/rya/api/resolver/RyaContext.java | 65 +--
extras/indexing/pom.xml | 5 +
.../client/accumulo/AccumuloLoadStatements.java | 123 +++++
.../accumulo/AccumuloRyaClientFactory.java | 1 +
.../pcj/matching/AccumuloIndexSetProvider.java | 15 +-
extras/periodic.notification/api/pom.xml | 48 +-
.../api/PeriodicNotificationClient.java | 17 +-
.../KafkaNotificationRegistrationClient.java | 34 +-
.../serialization/BindingSetSerDe.java | 9 +-
.../CommandNotificationSerializer.java | 30 +-
extras/periodic.notification/pom.xml | 55 +--
extras/periodic.notification/service/pom.xml | 72 ++-
.../PeriodicNotificationApplication.java | 76 +++-
...dicNotificationApplicationConfiguration.java | 142 +++---
.../PeriodicNotificationApplicationFactory.java | 81 ++--
.../exporter/KafkaExporterExecutor.java | 41 +-
.../KafkaPeriodicBindingSetExporter.java | 57 ++-
.../NotificationProcessorExecutor.java | 49 +-
.../TimestampedNotificationProcessor.java | 82 ++--
.../notification/pruner/AccumuloBinPruner.java | 28 +-
.../notification/pruner/FluoBinPruner.java | 26 +-
.../pruner/PeriodicQueryPruner.java | 64 +--
.../pruner/PeriodicQueryPrunerExecutor.java | 31 +-
.../kafka/KafkaNotificationProvider.java | 29 +-
.../kafka/PeriodicNotificationConsumer.java | 44 +-
extras/periodic.notification/tests/pom.xml | 30 +-
.../PeriodicNotificationApplicationIT.java | 2 +-
.../src/test/resources/notification.properties | 25 +-
.../periodic.notification/twill.yarn/README.md | 18 +
extras/periodic.notification/twill.yarn/pom.xml | 98 ++++
.../src/main/assembly/binary-release.xml | 30 ++
.../src/main/assembly/component-release.xml | 104 +++++
.../src/main/config/hadoop/core-site.xml | 25 ++
.../src/main/config/hadoop/yarn-site.xml | 25 ++
.../twill.yarn/src/main/config/logback.xml | 57 +++
.../src/main/config/notification.properties | 67 +++
.../twill.yarn/src/main/config/twill-env.sh | 63 +++
.../yarn/PeriodicNotificationTwillRunner.java | 315 +++++++++++++
.../scripts/periodicNotificationTwillApp.sh | 32 ++
extras/periodic.notification/twill/README.md | 36 ++
extras/periodic.notification/twill/pom.xml | 177 ++++++++
.../twill/PeriodicNotificationTwillApp.java | 57 +++
.../PeriodicNotificationTwillRunnable.java | 119 +++++
extras/rya.benchmark/README.md | 77 ++++
extras/rya.benchmark/pom.xml | 25 +-
.../src/main/assembly/binary-release.xml | 33 ++
.../src/main/assembly/component-release.xml | 81 ++++
.../src/main/config/common.options | 44 ++
.../src/main/config/log4j.properties | 41 ++
.../src/main/config/periodic.options | 49 ++
.../src/main/config/projection.options | 36 ++
.../benchmark/periodic/BenchmarkOptions.java | 78 ++++
.../periodic/BenchmarkStatementGenerator.java | 90 ++++
.../rya/benchmark/periodic/CommonOptions.java | 117 +++++
.../periodic/KafkaLatencyBenchmark.java | 445 +++++++++++++++++++
.../periodic/PeriodicQueryCommand.java | 70 +++
.../periodic/ProjectionQueryCommand.java | 31 ++
.../scripts/periodicNotificationBenchmark.sh | 32 ++
.../scripts/projectionNotificationBenchmark.sh | 32 ++
extras/rya.export/export.client/conf/config.xml | 18 +-
.../AccumuloPeriodicQueryResultStorage.java | 95 ++--
.../rya.manual/src/site/markdown/pcj-updater.md | 18 +-
extras/rya.pcj.fluo/pcj.fluo.app/pom.xml | 5 +-
.../pcj/fluo/app/FilterResultUpdater.java | 14 +-
.../app/batch/AbstractSpanBatchInformation.java | 4 +-
.../fluo/app/batch/JoinBatchInformation.java | 9 +-
.../export/kafka/KafkaBindingSetExporter.java | 14 +-
.../kafka/KafkaBindingSetExporterFactory.java | 14 +-
.../export/kafka/KafkaExportParameterBase.java | 5 +-
.../export/kafka/KafkaRyaSubGraphExporter.java | 23 +-
.../kafka/KafkaRyaSubGraphExporterFactory.java | 13 +-
.../KryoVisibilityBindingSetSerializer.java | 46 +-
.../rya/RyaBindingSetExporterFactory.java | 5 +-
.../fluo/app/observers/QueryResultObserver.java | 37 +-
.../pcj/fluo/app/observers/TripleObserver.java | 15 +-
.../indexing/pcj/fluo/app/query/FluoQuery.java | 84 ++--
.../fluo/app/query/FluoQueryMetadataDAO.java | 68 +--
.../pcj/fluo/integration/KafkaExportIT.java | 25 +-
.../org/apache/rya/shell/RyaAdminCommands.java | 3 +-
pom.xml | 74 ++-
83 files changed, 3619 insertions(+), 824 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/common/rya.api/pom.xml
----------------------------------------------------------------------
diff --git a/common/rya.api/pom.xml b/common/rya.api/pom.xml
index 65ef381..a683507 100644
--- a/common/rya.api/pom.xml
+++ b/common/rya.api/pom.xml
@@ -32,6 +32,10 @@ under the License.
<dependencies>
<dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.calrissian.mango</groupId>
<artifactId>mango-core</artifactId>
</dependency>
@@ -60,7 +64,6 @@ under the License.
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
- <version>2.8.0</version>
</dependency>
<dependency>
<groupId>com.github.stephenc.findbugs</groupId>
@@ -71,10 +74,9 @@ under the License.
<artifactId>jcip-annotations</artifactId>
</dependency>
<dependency>
- <groupId>com.esotericsoftware.kryo</groupId>
+ <groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
- <version>2.24.0</version>
- </dependency>
+ </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/common/rya.api/src/main/java/org/apache/rya/api/client/LoadStatements.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/LoadStatements.java b/common/rya.api/src/main/java/org/apache/rya/api/client/LoadStatements.java
new file mode 100644
index 0000000..2fdb77b
--- /dev/null
+++ b/common/rya.api/src/main/java/org/apache/rya/api/client/LoadStatements.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client;
+
+import org.openrdf.model.Statement;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Loads a set of statements into an instance of Rya.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface LoadStatements {
+
+ /**
+ * Loads a set of RDF statements into an instance of Rya.
+ *
+ * @param ryaInstanceName - The name of the Rya instance the statements will be loaded into. (not null)
+ * @param statements - An iterable of Statement objects that should be added to Rya. (not null)
+ * @throws InstanceDoesNotExistException No instance of Rya exists for the provided name.
+ * @throws RyaClientException Something caused the command to fail.
+ */
+ public void loadStatements(String ryaInstanceName, Iterable<? extends Statement> statements) throws InstanceDoesNotExistException, RyaClientException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java b/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java
index c04bd86..1278193 100644
--- a/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java
+++ b/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java
@@ -44,6 +44,7 @@ public class RyaClient {
private final AddUser addUser;
private final RemoveUser removeUser;
private final Uninstall uninstall;
+ private final LoadStatements loadStatements;
private final LoadStatementsFile loadStatementsFile;
private final ExecuteSparqlQuery executeSparqlQuery;
@@ -64,6 +65,7 @@ public class RyaClient {
final AddUser addUser,
final RemoveUser removeUser,
final Uninstall uninstall,
+ final LoadStatements loadStatements,
final LoadStatementsFile loadStatementsFile,
final ExecuteSparqlQuery executeSparqlQuery) {
this.install = requireNonNull(install);
@@ -79,6 +81,7 @@ public class RyaClient {
this.addUser = requireNonNull(addUser);
this.removeUser = requireNonNull(removeUser);
this.uninstall = requireNonNull(uninstall);
+ this.loadStatements = requireNonNull(loadStatements);
this.loadStatementsFile = requireNonNull(loadStatementsFile);
this.executeSparqlQuery = requireNonNull(executeSparqlQuery);
}
@@ -105,10 +108,10 @@ public class RyaClient {
public DeletePCJ getDeletePCJ() {
return deletePcj;
}
-
+
/**
* @return An instance of {@link CreatePeridodicPCJ} that is connected to a Rya Periodic Storage
- */
+ */
public CreatePeriodicPCJ getCreatePeriodicPCJ() {
return createPeriodicPcj;
}
@@ -119,7 +122,7 @@ public class RyaClient {
public DeletePeriodicPCJ getDeletePeriodicPCJ() {
return deletePeriodicPcj;
}
-
+
/**
* @return An instance of {@link ListIncrementalQueries} for displaying queries that are incrementallly
* maintained by the Rya instance
@@ -127,7 +130,7 @@ public class RyaClient {
public ListIncrementalQueries getListIncrementalQueries() {
return listIncrementalQueries;
}
-
+
/**
* @return An instance of {@link BatchUpdatePCJ} that is connect to a Rya storage
* if the Rya instance supports PCJ indexing.
@@ -179,6 +182,13 @@ public class RyaClient {
}
/**
+ * @return An instance of {@link LoadStatements} that is connected to a Rya storage.
+ */
+ public LoadStatements getLoadStatements() {
+ return loadStatements;
+ }
+
+ /**
* @return An instance of {@link LoadStatementsFile} that is connected to a Rya storage.
*/
public LoadStatementsFile getLoadStatementsFile() {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/common/rya.api/src/main/java/org/apache/rya/api/resolver/RyaContext.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/resolver/RyaContext.java b/common/rya.api/src/main/java/org/apache/rya/api/resolver/RyaContext.java
index d19226e..49fc5d1 100644
--- a/common/rya.api/src/main/java/org/apache/rya/api/resolver/RyaContext.java
+++ b/common/rya.api/src/main/java/org/apache/rya/api/resolver/RyaContext.java
@@ -8,9 +8,9 @@ package org.apache.rya.api.resolver;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,11 +39,10 @@ import org.apache.rya.api.resolver.impl.RyaTypeResolverImpl;
import org.apache.rya.api.resolver.impl.RyaURIResolver;
import org.apache.rya.api.resolver.impl.ServiceBackedRyaTypeResolverMappings;
import org.apache.rya.api.resolver.impl.ShortRyaTypeResolver;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.openrdf.model.URI;
import org.openrdf.model.vocabulary.XMLSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Date: 7/16/12
@@ -51,10 +50,10 @@ import org.openrdf.model.vocabulary.XMLSchema;
*/
public class RyaContext {
- public Log logger = LogFactory.getLog(RyaContext.class);
+ public Logger logger = LoggerFactory.getLogger(RyaContext.class);
- private Map<URI, RyaTypeResolver> uriToResolver = new HashMap<URI, RyaTypeResolver>();
- private Map<Byte, RyaTypeResolver> byteToResolver = new HashMap<Byte, RyaTypeResolver>();
+ private final Map<URI, RyaTypeResolver> uriToResolver = new HashMap<URI, RyaTypeResolver>();
+ private final Map<Byte, RyaTypeResolver> byteToResolver = new HashMap<Byte, RyaTypeResolver>();
private RyaTypeResolver defaultResolver = new CustomDatatypeResolver();
private RyaContext() {
@@ -91,47 +90,51 @@ public class RyaContext {
public synchronized static RyaContext getInstance() {
return RyaContextHolder.INSTANCE;
}
-
+
//need to go from datatype->resolver
- public RyaTypeResolver retrieveResolver(URI datatype) {
- RyaTypeResolver ryaTypeResolver = uriToResolver.get(datatype);
- if (ryaTypeResolver == null) return defaultResolver;
+ public RyaTypeResolver retrieveResolver(final URI datatype) {
+ final RyaTypeResolver ryaTypeResolver = uriToResolver.get(datatype);
+ if (ryaTypeResolver == null) {
+ return defaultResolver;
+ }
return ryaTypeResolver;
}
//need to go from byte->resolver
- public RyaTypeResolver retrieveResolver(byte markerByte) {
- RyaTypeResolver ryaTypeResolver = byteToResolver.get(markerByte);
- if (ryaTypeResolver == null) return defaultResolver;
+ public RyaTypeResolver retrieveResolver(final byte markerByte) {
+ final RyaTypeResolver ryaTypeResolver = byteToResolver.get(markerByte);
+ if (ryaTypeResolver == null) {
+ return defaultResolver;
+ }
return ryaTypeResolver;
}
- public byte[] serialize(RyaType ryaType) throws RyaTypeResolverException {
- RyaTypeResolver ryaTypeResolver = retrieveResolver(ryaType.getDataType());
+ public byte[] serialize(final RyaType ryaType) throws RyaTypeResolverException {
+ final RyaTypeResolver ryaTypeResolver = retrieveResolver(ryaType.getDataType());
if (ryaTypeResolver != null) {
return ryaTypeResolver.serialize(ryaType);
}
return null;
}
- public byte[][] serializeType(RyaType ryaType) throws RyaTypeResolverException {
- RyaTypeResolver ryaTypeResolver = retrieveResolver(ryaType.getDataType());
+ public byte[][] serializeType(final RyaType ryaType) throws RyaTypeResolverException {
+ final RyaTypeResolver ryaTypeResolver = retrieveResolver(ryaType.getDataType());
if (ryaTypeResolver != null) {
return ryaTypeResolver.serializeType(ryaType);
}
return null;
}
- public RyaType deserialize(byte[] bytes) throws RyaTypeResolverException {
- RyaTypeResolver ryaTypeResolver = retrieveResolver(bytes[bytes.length - 1]);
+ public RyaType deserialize(final byte[] bytes) throws RyaTypeResolverException {
+ final RyaTypeResolver ryaTypeResolver = retrieveResolver(bytes[bytes.length - 1]);
if (ryaTypeResolver != null) {
return ryaTypeResolver.deserialize(bytes);
}
return null;
}
- public void addRyaTypeResolverMapping(RyaTypeResolverMapping mapping) {
+ public void addRyaTypeResolverMapping(final RyaTypeResolverMapping mapping) {
if (!uriToResolver.containsKey(mapping.getRyaDataType())) {
if (logger.isDebugEnabled()) {
logger.debug("addRyaTypeResolverMapping uri:[" + mapping.getRyaDataType() + "] byte:[" + mapping.getMarkerByte() + "] for mapping[" + mapping + "]");
@@ -143,14 +146,14 @@ public class RyaContext {
}
}
- public void addRyaTypeResolverMappings(List<RyaTypeResolverMapping> mappings) {
- for (RyaTypeResolverMapping mapping : mappings) {
+ public void addRyaTypeResolverMappings(final List<RyaTypeResolverMapping> mappings) {
+ for (final RyaTypeResolverMapping mapping : mappings) {
addRyaTypeResolverMapping(mapping);
}
}
- public RyaTypeResolver removeRyaTypeResolver(URI dataType) {
- RyaTypeResolver ryaTypeResolver = uriToResolver.remove(dataType);
+ public RyaTypeResolver removeRyaTypeResolver(final URI dataType) {
+ final RyaTypeResolver ryaTypeResolver = uriToResolver.remove(dataType);
if (ryaTypeResolver != null) {
if (logger.isDebugEnabled()) {
logger.debug("Removing ryaType Resolver uri[" + dataType + "] + [" + ryaTypeResolver + "]");
@@ -161,8 +164,8 @@ public class RyaContext {
return null;
}
- public RyaTypeResolver removeRyaTypeResolver(byte markerByte) {
- RyaTypeResolver ryaTypeResolver = byteToResolver.remove(markerByte);
+ public RyaTypeResolver removeRyaTypeResolver(final byte markerByte) {
+ final RyaTypeResolver ryaTypeResolver = byteToResolver.remove(markerByte);
if (ryaTypeResolver != null) {
if (logger.isDebugEnabled()) {
logger.debug("Removing ryaType Resolver byte[" + markerByte + "] + [" + ryaTypeResolver + "]");
@@ -174,8 +177,8 @@ public class RyaContext {
}
//transform range
- public RyaRange transformRange(RyaRange range) throws RyaTypeResolverException {
- RyaTypeResolver ryaTypeResolver = retrieveResolver(range.getStart().getDataType());
+ public RyaRange transformRange(final RyaRange range) throws RyaTypeResolverException {
+ final RyaTypeResolver ryaTypeResolver = retrieveResolver(range.getStart().getDataType());
if (ryaTypeResolver != null) {
return ryaTypeResolver.transformRange(range);
}
@@ -186,7 +189,7 @@ public class RyaContext {
return defaultResolver;
}
- public void setDefaultResolver(RyaTypeResolver defaultResolver) {
+ public void setDefaultResolver(final RyaTypeResolver defaultResolver) {
this.defaultResolver = defaultResolver;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/indexing/pom.xml
----------------------------------------------------------------------
diff --git a/extras/indexing/pom.xml b/extras/indexing/pom.xml
index 6fe35e9..7f3901e 100644
--- a/extras/indexing/pom.xml
+++ b/extras/indexing/pom.xml
@@ -149,6 +149,11 @@
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
+ <id>map-reduce</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
<configuration>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>map-reduce</shadedClassifierName>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloLoadStatements.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloLoadStatements.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloLoadStatements.java
new file mode 100644
index 0000000..9556bcf
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloLoadStatements.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.log4j.Logger;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.InstanceDoesNotExistException;
+import org.apache.rya.api.client.InstanceExists;
+import org.apache.rya.api.client.LoadStatements;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.openrdf.model.Statement;
+import org.openrdf.repository.RepositoryException;
+import org.openrdf.repository.sail.SailRepository;
+import org.openrdf.repository.sail.SailRepositoryConnection;
+import org.openrdf.sail.Sail;
+import org.openrdf.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * An Accumulo implementation of the {@link LoadStatements} command.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloLoadStatements extends AccumuloCommand implements LoadStatements {
+ private static final Logger log = Logger.getLogger(AccumuloLoadStatements.class);
+
+ private final InstanceExists instanceExists;
+
+ /**
+ * Constructs an instance of {@link AccumuloLoadStatements}.
+ *
+ * @param connectionDetails - Details about the values that were used to create
+ * the connector to the cluster. (not null)
+ * @param connector - Provides programmatic access to the instance of Accumulo
+ * that hosts Rya instance. (not null)
+ */
+ public AccumuloLoadStatements(final AccumuloConnectionDetails connectionDetails, final Connector connector) {
+ super(connectionDetails, connector);
+ instanceExists = new AccumuloInstanceExists(connectionDetails, connector);
+ }
+
+ @Override
+ public void loadStatements(final String ryaInstanceName, final Iterable<? extends Statement> statements) throws InstanceDoesNotExistException, RyaClientException {
+ requireNonNull(ryaInstanceName);
+ requireNonNull(statements);
+
+ // Ensure the Rya Instance exists.
+ if(!instanceExists.exists(ryaInstanceName)) {
+ throw new InstanceDoesNotExistException(String.format("There is no Rya instance named '%s'.", ryaInstanceName));
+ }
+
+ Sail sail = null;
+ SailRepository sailRepo = null;
+ SailRepositoryConnection sailRepoConn = null;
+
+ try {
+ // Get a Sail object that is connected to the Rya instance.
+ final AccumuloRdfConfiguration ryaConf = getAccumuloConnectionDetails().buildAccumuloRdfConfiguration(ryaInstanceName);
+ ryaConf.setFlush(false); //RYA-327 should address this hardcoded value.
+ sail = RyaSailFactory.getInstance(ryaConf);
+
+ // Load the file.
+ sailRepo = new SailRepository(sail);
+ sailRepoConn = sailRepo.getConnection();
+ sailRepoConn.add(statements);
+
+ } catch (final SailException | AccumuloException | AccumuloSecurityException | RyaDAOException | InferenceEngineException e) {
+ log.warn("Exception while loading:", e);
+ throw new RyaClientException("A problem connecting to the Rya instance named '" + ryaInstanceName + "' has caused the load to fail.", e);
+ } catch (final Exception e) {
+ log.warn("Exception while loading:", e);
+ throw new RyaClientException("A problem processing the RDF statements has caused the load into Rya instance named " + ryaInstanceName + "to fail.", e);
+ } finally {
+ // Shut it all down.
+ if(sailRepoConn != null) {
+ try {
+ sailRepoConn.close();
+ } catch (final RepositoryException e) {
+ log.warn("Couldn't close the SailRepoConnection that is attached to the Rya instance.", e);
+ }
+ }
+ if(sailRepo != null) {
+ try {
+ sailRepo.shutDown();
+ } catch (final RepositoryException e) {
+ log.warn("Couldn't shut down the SailRepository that is attached to the Rya instance.", e);
+ }
+ }
+ if(sail != null) {
+ try {
+ sail.shutDown();
+ } catch (final SailException e) {
+ log.warn("Couldn't shut down the Sail that is attached to the Rya instance.", e);
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java
index d9bf644..fcc712c 100644
--- a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java
@@ -62,6 +62,7 @@ public class AccumuloRyaClientFactory {
new AccumuloAddUser(connectionDetails, connector),
new AccumuloRemoveUser(connectionDetails, connector),
new AccumuloUninstall(connectionDetails, connector),
+ new AccumuloLoadStatements(connectionDetails, connector),
new AccumuloLoadStatementsFile(connectionDetails, connector),
new AccumuloExecuteSparqlQuery(connectionDetails, connector));
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java
index 40e2c77..4a15665 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java
@@ -18,12 +18,11 @@
*/
package org.apache.rya.indexing.pcj.matching;
-import static java.util.Objects.requireNonNull;
-
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -73,13 +72,11 @@ public class AccumuloIndexSetProvider implements ExternalSetProvider<ExternalTup
private boolean init = false;
public AccumuloIndexSetProvider(final Configuration conf) {
- Preconditions.checkNotNull(conf);
- this.conf = conf;
+ this.conf = Objects.requireNonNull(conf);
}
public AccumuloIndexSetProvider(final Configuration conf, final List<ExternalTupleSet> indices) {
- Preconditions.checkNotNull(conf);
- this.conf = conf;
+ this(conf);
indexCache = indices;
init = true;
}
@@ -155,9 +152,9 @@ public class AccumuloIndexSetProvider implements ExternalSetProvider<ExternalTup
*/
private List<ExternalTupleSet> getAccIndices() throws Exception {
- requireNonNull(conf);
- final String tablePrefix = requireNonNull(conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX));
- final Connector conn = requireNonNull(ConfigUtils.getConnector(conf));
+ Objects.requireNonNull(conf);
+ final String tablePrefix = Objects.requireNonNull(conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX));
+ final Connector conn = Objects.requireNonNull(ConfigUtils.getConnector(conf));
List<String> tables = null;
if (conf instanceof RdfCloudTripleStoreConfiguration) {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/api/pom.xml
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/pom.xml b/extras/periodic.notification/api/pom.xml
index 9f62e73..01d5c60 100644
--- a/extras/periodic.notification/api/pom.xml
+++ b/extras/periodic.notification/api/pom.xml
@@ -1,15 +1,23 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <!-- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with this
- work for additional information regarding copyright ownership. The ASF licenses
- this file to you under the Apache License, Version 2.0 (the "License"); you
- may not use this file except in compliance with the License. You may obtain
- a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
- required by applicable law or agreed to in writing, software distributed
- under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
- OR CONDITIONS OF ANY KIND, either express or implied. See the License for
- the specific language governing permissions and limitations under the License. -->
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.rya</groupId>
@@ -23,16 +31,13 @@
<description>API for Periodic Notification Applications</description>
<dependencies>
-
<dependency>
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
- <version>2.8.0</version>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>org.openrdf.sesame</groupId>
@@ -46,6 +51,13 @@
<groupId>org.apache.rya</groupId>
<artifactId>rya.indexing.pcj</artifactId>
</dependency>
+
+ <!-- testing dependencies -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java
index ff08733..5a473d2 100644
--- a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java
@@ -36,20 +36,20 @@ public interface PeriodicNotificationClient extends AutoCloseable {
* Adds a new notification to be registered with the {@link NotificationCoordinatorExecutor}
* @param notification - notification to be added
*/
- public void addNotification(PeriodicNotification notification);
-
+ void addNotification(PeriodicNotification notification);
+
/**
* Deletes a notification from the {@link NotificationCoordinatorExecutor}.
* @param notification - notification to be deleted
*/
- public void deleteNotification(BasicNotification notification);
-
+ void deleteNotification(BasicNotification notification);
+
/**
* Deletes a notification from the {@link NotificationCoordinatorExecutor}.
* @param notification - id corresponding to the notification to be deleted
*/
- public void deleteNotification(String notificationId);
-
+ void deleteNotification(String notificationId);
+
/**
* Adds a new notification with the indicated id and period to the {@link NotificationCoordinatorExecutor}
* @param id - Periodic Query id
@@ -57,8 +57,5 @@ public interface PeriodicNotificationClient extends AutoCloseable {
* @param delay - initial delay for starting periodic notifications
* @param unit - time unit of delay and period
*/
- public void addNotification(String id, long period, long delay, TimeUnit unit);
-
- public void close();
-
+ void addNotification(String id, long period, long delay, TimeUnit unit);
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java
index bb438be..b022d3e 100644
--- a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java
@@ -31,50 +31,50 @@ import org.apache.rya.periodic.notification.notification.PeriodicNotification;
/**
* Implementation of {@link PeriodicNotificaitonClient} used to register new notification
- * requests with the PeriodicQueryService.
+ * requests with the PeriodicQueryService.
*
*/
public class KafkaNotificationRegistrationClient implements PeriodicNotificationClient {
- private KafkaProducer<String, CommandNotification> producer;
- private String topic;
-
- public KafkaNotificationRegistrationClient(String topic, KafkaProducer<String, CommandNotification> producer) {
+ private final KafkaProducer<String, CommandNotification> producer;
+ private final String topic;
+
+ public KafkaNotificationRegistrationClient(final String topic, final KafkaProducer<String, CommandNotification> producer) {
this.topic = topic;
this.producer = producer;
}
-
+
@Override
- public void addNotification(PeriodicNotification notification) {
+ public void addNotification(final PeriodicNotification notification) {
processNotification(new CommandNotification(Command.ADD, notification));
}
@Override
- public void deleteNotification(BasicNotification notification) {
+ public void deleteNotification(final BasicNotification notification) {
processNotification(new CommandNotification(Command.DELETE, notification));
}
@Override
- public void deleteNotification(String notificationId) {
+ public void deleteNotification(final String notificationId) {
processNotification(new CommandNotification(Command.DELETE, new BasicNotification(notificationId)));
}
@Override
- public void addNotification(String id, long period, long delay, TimeUnit unit) {
- Notification notification = PeriodicNotification.builder().id(id).period(period).initialDelay(delay).timeUnit(unit).build();
+ public void addNotification(final String id, final long period, final long delay, final TimeUnit unit) {
+ final Notification notification = PeriodicNotification.builder().id(id).period(period).initialDelay(delay).timeUnit(unit).build();
processNotification(new CommandNotification(Command.ADD, notification));
}
-
-
- private void processNotification(CommandNotification notification) {
+
+
+ private void processNotification(final CommandNotification notification) {
producer.send(new ProducerRecord<String, CommandNotification>(topic, notification.getId(), notification));
}
-
+
@Override
public void close() {
+ // TODO scoping issue. If we're closing this producer, we should also create it - otherwise other classes may be using it
+ // or we shouldn't implement autocloseable.
producer.close();
}
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java
index 129bd6d..6db7b18 100644
--- a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java
@@ -25,12 +25,13 @@ import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.log4j.Logger;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjSerializer;
import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import org.openrdf.query.BindingSet;
import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
import com.google.common.primitives.Bytes;
@@ -42,7 +43,7 @@ import com.google.common.primitives.Bytes;
*/
public class BindingSetSerDe implements Serializer<BindingSet>, Deserializer<BindingSet> {
- private static final Logger log = Logger.getLogger(BindingSetSerDe.class);
+ private static final Logger log = LoggerFactory.getLogger(BindingSetSerDe.class);
private static final AccumuloPcjSerializer serializer = new AccumuloPcjSerializer();
private static final byte[] DELIM_BYTE = "\u0002".getBytes(StandardCharsets.UTF_8);
@@ -60,7 +61,7 @@ public class BindingSetSerDe implements Serializer<BindingSet>, Deserializer<Bin
final int firstIndex = Bytes.indexOf(bsBytes, DELIM_BYTE);
final byte[] varOrderBytes = Arrays.copyOf(bsBytes, firstIndex);
final byte[] bsBytesNoVarOrder = Arrays.copyOfRange(bsBytes, firstIndex + 1, bsBytes.length);
- final VariableOrder varOrder = new VariableOrder(new String(varOrderBytes,"UTF-8").split(";"));
+ final VariableOrder varOrder = new VariableOrder(new String(varOrderBytes, StandardCharsets.UTF_8).split(";"));
return getBindingSet(varOrder, bsBytesNoVarOrder);
} catch(final Exception e) {
log.trace("Unable to deserialize BindingSet: " + bsBytes);
@@ -75,7 +76,7 @@ public class BindingSetSerDe implements Serializer<BindingSet>, Deserializer<Bin
private byte[] getBytes(final VariableOrder varOrder, final BindingSet bs) throws UnsupportedEncodingException, BindingSetConversionException {
final byte[] bsBytes = serializer.convert(bs, varOrder);
final String varOrderString = Joiner.on(";").join(varOrder.getVariableOrders());
- final byte[] varOrderBytes = varOrderString.getBytes("UTF-8");
+ final byte[] varOrderBytes = varOrderString.getBytes(StandardCharsets.UTF_8);
return Bytes.concat(varOrderBytes, DELIM_BYTE, bsBytes);
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java
index 302e1be..13c789f 100644
--- a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java
@@ -18,7 +18,7 @@
*/
package org.apache.rya.periodic.notification.serialization;
-import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
+import com.google.gson.JsonParseException;
/**
* Kafka {@link Serializer} and {@link Deserializer} for producing and consuming {@link CommandNotification}s
@@ -43,22 +44,23 @@ public class CommandNotificationSerializer implements Serializer<CommandNotifica
private static final Logger LOG = LoggerFactory.getLogger(CommandNotificationSerializer.class);
@Override
- public CommandNotification deserialize(String topic, byte[] bytes) {
- String json = null;
+ public CommandNotification deserialize(final String topic, final byte[] bytes) {
try {
- json = new String(bytes, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- LOG.info("Unable to deserialize notification for topic: " + topic);
+ final String json = new String(bytes, StandardCharsets.UTF_8);
+ return gson.fromJson(json, CommandNotification.class);
+ } catch (final JsonParseException e) {
+ LOG.warn("Unable to deserialize notification for topic: " + topic);
+ throw new RuntimeException(e);
}
- return gson.fromJson(json, CommandNotification.class);
+
}
@Override
- public byte[] serialize(String topic, CommandNotification command) {
+ public byte[] serialize(final String topic, final CommandNotification command) {
try {
- return gson.toJson(command).getBytes("UTF-8");
- } catch (UnsupportedEncodingException e) {
- LOG.info("Unable to serialize notification: " + command + "for topic: " + topic);
+ return gson.toJson(command).getBytes(StandardCharsets.UTF_8);
+ } catch (final JsonParseException e) {
+ LOG.warn("Unable to serialize notification: " + command + "for topic: " + topic);
throw new RuntimeException(e);
}
}
@@ -67,10 +69,10 @@ public class CommandNotificationSerializer implements Serializer<CommandNotifica
public void close() {
// Do nothing. Nothing to close
}
-
+
@Override
- public void configure(Map<String, ?> arg0, boolean arg1) {
+ public void configure(final Map<String, ?> arg0, final boolean arg1) {
// Do nothing. Nothing to configure
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/pom.xml
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/pom.xml b/extras/periodic.notification/pom.xml
index c49db73..7610b88 100644
--- a/extras/periodic.notification/pom.xml
+++ b/extras/periodic.notification/pom.xml
@@ -1,40 +1,43 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
+ http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
-->
- <modelVersion>4.0.0</modelVersion>
- <artifactId>rya.periodic.notification.parent</artifactId>
-
- <parent>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
<groupId>org.apache.rya</groupId>
<artifactId>rya.extras</artifactId>
<version>3.2.12-incubating-SNAPSHOT</version>
</parent>
-
- <name>Apache Rya Periodic Notification Parent</name>
- <description>Parent POM for Rya Periodic Notification Projects</description>
-
- <packaging>pom</packaging>
+
+ <artifactId>rya.periodic.notification.parent</artifactId>
+
+ <name>Apache Rya Periodic Notification Parent</name>
+ <description>Parent POM for Rya Periodic Notification Projects</description>
+
+ <packaging>pom</packaging>
<modules>
<module>api</module>
<module>service</module>
+ <module>twill</module>
+ <module>twill.yarn</module>
<module>tests</module>
</modules>
-
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/service/pom.xml
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/service/pom.xml b/extras/periodic.notification/service/pom.xml
index 2e61733..18aef13 100644
--- a/extras/periodic.notification/service/pom.xml
+++ b/extras/periodic.notification/service/pom.xml
@@ -1,15 +1,24 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
- <!-- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with this
- work for additional information regarding copyright ownership. The ASF licenses
- this file to you under the Apache License, Version 2.0 (the "License"); you
- may not use this file except in compliance with the License. You may obtain
- a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
- required by applicable law or agreed to in writing, software distributed
- under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
- OR CONDITIONS OF ANY KIND, either express or implied. See the License for
- the specific language governing permissions and limitations under the License. -->
<parent>
<groupId>org.apache.rya</groupId>
<artifactId>rya.periodic.notification.parent</artifactId>
@@ -22,42 +31,20 @@
<description>Notifications for Rya Periodic Service</description>
<dependencies>
-
+ <!-- compile dependencies -->
<dependency>
- <groupId>org.apache.twill</groupId>
- <artifactId>twill-api</artifactId>
- <version>0.11.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.twill</groupId>
- <artifactId>twill-yarn</artifactId>
- <version>0.11.0</version>
- <exclusions>
- <exclusion>
- <artifactId>kafka_2.10</artifactId>
- <groupId>org.apache.kafka</groupId>
- </exclusion>
- </exclusions>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
- <version>2.8.0</version>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.fluo</groupId>
<artifactId>fluo-api</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.fluo</groupId>
- <artifactId>fluo-core</artifactId>
- </dependency>
- <dependency>
<groupId>org.apache.rya</groupId>
<artifactId>rya.indexing</artifactId>
</dependency>
@@ -71,13 +58,22 @@
</dependency>
<dependency>
<groupId>org.apache.rya</groupId>
- <artifactId>rya.pcj.fluo.app</artifactId>
+ <artifactId>rya.periodic.notification.api</artifactId>
</dependency>
+
+ <!-- runtime dependencies -->
<dependency>
- <groupId>org.apache.rya</groupId>
- <artifactId>rya.periodic.notification.api</artifactId>
+ <groupId>org.apache.fluo</groupId>
+ <artifactId>fluo-core</artifactId>
+ <scope>runtime</scope>
</dependency>
+ <!-- testing dependencies -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java
index 92a7d18..79abe2f 100644
--- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java
+++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java
@@ -18,7 +18,10 @@
*/
package org.apache.rya.periodic.notification.application;
-import org.apache.log4j.Logger;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
import org.apache.rya.periodic.notification.api.BinPruner;
import org.apache.rya.periodic.notification.api.BindingSetRecord;
@@ -30,6 +33,8 @@ import org.apache.rya.periodic.notification.processor.NotificationProcessorExecu
import org.apache.rya.periodic.notification.pruner.PeriodicQueryPrunerExecutor;
import org.apache.rya.periodic.notification.registration.kafka.KafkaNotificationProvider;
import org.openrdf.query.algebra.evaluation.function.Function;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
@@ -68,36 +73,40 @@ import com.google.common.base.Preconditions;
* the period at which the user wants to receive updates, and the time unit. The
* following query requests all observations that occurred within the last
* minute and requests updates every 15 seconds. It also performs a count on
- * those observations. <br>
- * <br>
- * <li>prefix function: http://org.apache.rya/function#
- * <li>"prefix time: http://www.w3.org/2006/time#
- * <li>"select (count(?obs) as ?total) where {
- * <li>"Filter(function:periodic(?time, 1, .25, time:minutes))
- * <li>"?obs uri:hasTime ?time.
- * <li>"?obs uri:hasId ?id }
- * <li>
+ * those observations.
+ * <p>
+ * <pre>
+ * PREFIX function: http://org.apache.rya/function#
+ * PREFIX time: http://www.w3.org/2006/time#
+ * SELECT (count(?obs) as ?total) WHERE {
+ * FILTER (function:periodic(?time, 1, .25, time:minutes))
+ * ?obs uri:hasTime ?time.
+ * ?obs uri:hasId ?id
+ * }
+ * </pre>
*/
public class PeriodicNotificationApplication implements LifeCycle {
- private static final Logger log = Logger.getLogger(PeriodicNotificationApplication.class);
- private NotificationCoordinatorExecutor coordinator;
- private KafkaNotificationProvider provider;
- private PeriodicQueryPrunerExecutor pruner;
- private NotificationProcessorExecutor processor;
- private KafkaExporterExecutor exporter;
+ private static final Logger log = LoggerFactory.getLogger(PeriodicNotificationApplication.class);
+ private final NotificationCoordinatorExecutor coordinator;
+ private final KafkaNotificationProvider provider;
+ private final PeriodicQueryPrunerExecutor pruner;
+ private final NotificationProcessorExecutor processor;
+ private final KafkaExporterExecutor exporter;
private boolean running = false;
+ private Optional<CompletableFuture<Void>> finished = Optional.empty();
+
/**
* Creates a PeriodicNotificationApplication
- * @param provider - {@link KafkaNotificationProvider} that retrieves new Notificaiton requests from Kafka
+ * @param provider - {@link KafkaNotificationProvider} that retrieves new Notification requests from Kafka
* @param coordinator - {NotificationCoordinator} that manages PeriodicNotifications.
* @param processor - {@link NotificationProcessorExecutor} that processes PeriodicNotifications
* @param exporter - {@link KafkaExporterExecutor} that exports periodic results
* @param pruner - {@link PeriodicQueryPrunerExecutor} that cleans up old periodic bins
*/
- public PeriodicNotificationApplication(KafkaNotificationProvider provider, NotificationCoordinatorExecutor coordinator,
- NotificationProcessorExecutor processor, KafkaExporterExecutor exporter, PeriodicQueryPrunerExecutor pruner) {
+ public PeriodicNotificationApplication(final KafkaNotificationProvider provider, final NotificationCoordinatorExecutor coordinator,
+ final NotificationProcessorExecutor processor, final KafkaExporterExecutor exporter, final PeriodicQueryPrunerExecutor pruner) {
this.provider = Preconditions.checkNotNull(provider);
this.coordinator = Preconditions.checkNotNull(coordinator);
this.processor = Preconditions.checkNotNull(processor);
@@ -115,18 +124,37 @@ public class PeriodicNotificationApplication implements LifeCycle {
pruner.start();
exporter.start();
running = true;
+ finished = Optional.of(new CompletableFuture<>());
+ }
+ }
+
+ /**
+ * Blocks the current thread until another thread has called the {@link #stop()}.
+ * @throws ExecutionException
+ * @throws InterruptedException
+ * @throws IllegalStateException
+ */
+ public void blockUntilFinished() throws ExecutionException, InterruptedException, IllegalStateException {
+ if(finished.isPresent()) {
+ finished.get().get();
+ } else {
+ throw new IllegalStateException("Cannot block if the application has not been started yet");
}
}
@Override
public void stop() {
log.info("Stopping PeriodicNotificationApplication.");
+ if(!finished.isPresent()) {
+ throw new IllegalStateException("Cannot stop if the application has not been started yet");
+ }
provider.stop();
coordinator.stop();
processor.stop();
pruner.stop();
exporter.stop();
running = false;
+ finished.get().complete(null);
}
/**
@@ -154,7 +182,7 @@ public class PeriodicNotificationApplication implements LifeCycle {
* @param pruner - PeriodicQueryPrunerExecutor for cleaning up old periodic bins
* @return this Builder for chaining method calls
*/
- public Builder setPruner(PeriodicQueryPrunerExecutor pruner) {
+ public Builder setPruner(final PeriodicQueryPrunerExecutor pruner) {
this.pruner = pruner;
return this;
}
@@ -164,12 +192,12 @@ public class PeriodicNotificationApplication implements LifeCycle {
* @param provider - KafkaNotificationProvider for retrieving new periodic notification requests from Kafka
* @return this Builder for chaining method calls
*/
- public Builder setProvider(KafkaNotificationProvider provider) {
+ public Builder setProvider(final KafkaNotificationProvider provider) {
this.provider = provider;
return this;
}
- public Builder setProcessor(NotificationProcessorExecutor processor) {
+ public Builder setProcessor(final NotificationProcessorExecutor processor) {
this.processor = processor;
return this;
}
@@ -179,7 +207,7 @@ public class PeriodicNotificationApplication implements LifeCycle {
* @param exporter for exporting periodic query results to Kafka
* @return this Builder for chaining method calls
*/
- public Builder setExporter(KafkaExporterExecutor exporter) {
+ public Builder setExporter(final KafkaExporterExecutor exporter) {
this.exporter = exporter;
return this;
}
@@ -189,7 +217,7 @@ public class PeriodicNotificationApplication implements LifeCycle {
* @param coordinator for managing and generating periodic notifications
* @return this Builder for chaining method calls
*/
- public Builder setCoordinator(NotificationCoordinatorExecutor coordinator) {
+ public Builder setCoordinator(final NotificationCoordinatorExecutor coordinator) {
this.coordinator = coordinator;
return this;
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java
index ff58979..9f0631d 100644
--- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java
+++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java
@@ -18,35 +18,36 @@
*/
package org.apache.rya.periodic.notification.application;
+import java.util.Objects;
import java.util.Properties;
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
-import com.google.common.base.Preconditions;
/**
* Configuration object for creating a {@link PeriodicNotificationApplication}.
*/
public class PeriodicNotificationApplicationConfiguration extends AccumuloRdfConfiguration {
- public static String FLUO_APP_NAME = "fluo.app.name";
- public static String FLUO_TABLE_NAME = "fluo.table.name";
- public static String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
- public static String NOTIFICATION_TOPIC = "kafka.notification.topic";
- public static String NOTIFICATION_GROUP_ID = "kafka.notification.group.id";
- public static String NOTIFICATION_CLIENT_ID = "kafka.notification.client.id";
- public static String COORDINATOR_THREADS = "cep.coordinator.threads";
- public static String PRODUCER_THREADS = "cep.producer.threads";
- public static String EXPORTER_THREADS = "cep.exporter.threads";
- public static String PROCESSOR_THREADS = "cep.processor.threads";
- public static String PRUNER_THREADS = "cep.pruner.threads";
-
+ public static final String RYA_PERIODIC_PREFIX = "rya.periodic.notification.";
+ public static final String RYA_PCJ_PREFIX = "rya.pcj.";
+ public static final String FLUO_APP_NAME = RYA_PCJ_PREFIX +"fluo.app.name";
+ public static final String FLUO_TABLE_NAME = RYA_PCJ_PREFIX + "fluo.table.name";
+ public static final String KAFKA_BOOTSTRAP_SERVERS = RYA_PERIODIC_PREFIX + "kafka.bootstrap.servers";
+ public static final String NOTIFICATION_TOPIC = RYA_PERIODIC_PREFIX + "kafka.topic";
+ public static final String NOTIFICATION_GROUP_ID = RYA_PERIODIC_PREFIX + "kafka.group.id";
+ public static final String NOTIFICATION_CLIENT_ID = RYA_PERIODIC_PREFIX + "kafka.client.id";
+ public static final String COORDINATOR_THREADS = RYA_PERIODIC_PREFIX + "coordinator.threads";
+ public static final String PRODUCER_THREADS = RYA_PERIODIC_PREFIX + "producer.threads";
+ public static final String EXPORTER_THREADS = RYA_PERIODIC_PREFIX + "exporter.threads";
+ public static final String PROCESSOR_THREADS = RYA_PERIODIC_PREFIX + "processor.threads";
+ public static final String PRUNER_THREADS = RYA_PERIODIC_PREFIX + "pruner.threads";
+
public PeriodicNotificationApplicationConfiguration() {}
-
+
/**
* Creates an PeriodicNotificationApplicationConfiguration object from a Properties file. This method assumes
* that all values in the Properties file are Strings and that the Properties file uses the keys below.
- * See rya.cep/cep.integration.tests/src/test/resources/properties/notification.properties for an example.
* <br>
* <ul>
* <li>"accumulo.auths" - String of Accumulo authorizations. Default is empty String.
@@ -55,23 +56,22 @@ public class PeriodicNotificationApplicationConfiguration extends AccumuloRdfCon
* <li>"accumulo.password" - Accumulo password (required)
* <li>"accumulo.rya.prefix" - Prefix for Accumulo backed Rya instance. Default is "rya_"
* <li>"accumulo.zookeepers" - Zookeepers for underlying Accumulo instance (required)
- * <li>"fluo.app.name" - Name of Fluo Application (required)
- * <li>"fluo.table.name" - Name of Fluo Table (required)
- * <li>"kafka.bootstrap.servers" - Kafka Bootstrap servers for Producers and Consumers (required)
- * <li>"kafka.notification.topic" - Topic to which new Periodic Notifications are published. Default is "notifications".
- * <li>"kafka.notification.client.id" - Client Id for notification topic. Default is "consumer0"
- * <li>"kafka.notification.group.id" - Group Id for notification topic. Default is "group0"
- * <li>"cep.coordinator.threads" - Number of threads used by coordinator. Default is 1.
- * <li>"cep.producer.threads" - Number of threads used by producer. Default is 1.
- * <li>"cep.exporter.threads" - Number of threads used by exporter. Default is 1.
- * <li>"cep.processor.threads" - Number of threads used by processor. Default is 1.
- * <li>"cep.pruner.threads" - Number of threads used by pruner. Default is 1.
+ * <li>"rya.pcj.fluo.app.name" - Name of Fluo Application (required)
+ * <li>"rya.pcj.fluo.table.name" - Name of Fluo Table (required)
+ * <li>"rya.periodic.notification.kafka.bootstrap.servers" - Kafka Bootstrap servers for Producers and Consumers (required)
+ * <li>"rya.periodic.notification.kafka.topic" - Topic to which new Periodic Notifications are published. Default is "notifications".
+ * <li>"rya.periodic.notification.kafka.client.id" - Client Id for notification topic. Default is "consumer0"
+ * <li>"rya.periodic.notification.kafka.group.id" - Group Id for notification topic. Default is "group0"
+ * <li>"rya.periodic.notification.coordinator.threads" - Number of threads used by coordinator. Default is 1.
+ * <li>"rya.periodic.notification.producer.threads" - Number of threads used by producer. Default is 1.
+ * <li>"rya.periodic.notification.exporter.threads" - Number of threads used by exporter. Default is 1.
+ * <li>"rya.periodic.notification.processor.threads" - Number of threads used by processor. Default is 1.
+ * <li>"rya.periodic.notification.pruner.threads" - Number of threads used by pruner. Default is 1.
* </ul>
* <br>
* @param props - Properties file containing Accumulo specific configuration parameters
- * @return AccumumuloRdfConfiguration with properties set
*/
- public PeriodicNotificationApplicationConfiguration(Properties props) {
+ public PeriodicNotificationApplicationConfiguration(final Properties props) {
super(fromProperties(props));
setFluoAppName(props.getProperty(FLUO_APP_NAME));
setFluoTableName(props.getProperty(FLUO_TABLE_NAME));
@@ -85,170 +85,170 @@ public class PeriodicNotificationApplicationConfiguration extends AccumuloRdfCon
setPrunerThreads(Integer.parseInt(props.getProperty(PRUNER_THREADS, "1")));
setCoordinatorThreads(Integer.parseInt(props.getProperty(COORDINATOR_THREADS, "1")));
}
-
+
/**
* Sets the name of the Fluo Application
- * @param fluoAppName
+ * @param fluoAppName
*/
- public void setFluoAppName(String fluoAppName) {
- set(FLUO_APP_NAME, Preconditions.checkNotNull(fluoAppName));
+ public void setFluoAppName(final String fluoAppName) {
+ set(FLUO_APP_NAME, Objects.requireNonNull(fluoAppName));
}
-
+
/**
* Sets the name of the Fluo table
* @param fluoTableName
*/
- public void setFluoTableName(String fluoTableName) {
- set(FLUO_TABLE_NAME, Preconditions.checkNotNull(fluoTableName));
+ public void setFluoTableName(final String fluoTableName) {
+ set(FLUO_TABLE_NAME, Objects.requireNonNull(fluoTableName));
}
-
+
/**
* Sets the Kafka bootstrap servers
* @param bootStrapServers
*/
- public void setBootStrapServers(String bootStrapServers) {
- set(KAFKA_BOOTSTRAP_SERVERS, Preconditions.checkNotNull(bootStrapServers));
+ public void setBootStrapServers(final String bootStrapServers) {
+ set(KAFKA_BOOTSTRAP_SERVERS, Objects.requireNonNull(bootStrapServers));
}
-
+
/**
* Sets the Kafka topic name for new notification requests
* @param notificationTopic
*/
- public void setNotificationTopic(String notificationTopic) {
- set(NOTIFICATION_TOPIC, Preconditions.checkNotNull(notificationTopic));
+ public void setNotificationTopic(final String notificationTopic) {
+ set(NOTIFICATION_TOPIC, Objects.requireNonNull(notificationTopic));
}
-
+
/**
* Sets the GroupId for new notification request topic
* @param notificationGroupId
*/
- public void setNotificationGroupId(String notificationGroupId) {
- set(NOTIFICATION_GROUP_ID, Preconditions.checkNotNull(notificationGroupId));
+ public void setNotificationGroupId(final String notificationGroupId) {
+ set(NOTIFICATION_GROUP_ID, Objects.requireNonNull(notificationGroupId));
}
-
+
/**
* Sets the ClientId for the Kafka notification topic
* @param notificationClientId
*/
- public void setNotificationClientId(String notificationClientId) {
- set(NOTIFICATION_GROUP_ID, Preconditions.checkNotNull(notificationClientId));
+ public void setNotificationClientId(final String notificationClientId) {
+ set(NOTIFICATION_CLIENT_ID, Objects.requireNonNull(notificationClientId));
}
-
+
/**
* Sets the number of threads for the coordinator
* @param threads
*/
- public void setCoordinatorThreads(int threads) {
+ public void setCoordinatorThreads(final int threads) {
setInt(COORDINATOR_THREADS, threads);
}
-
+
/**
* Sets the number of threads for the exporter
* @param threads
*/
- public void setExporterThreads(int threads) {
+ public void setExporterThreads(final int threads) {
setInt(EXPORTER_THREADS, threads);
}
-
+
/**
* Sets the number of threads for the producer for reading new periodic notifications
* @param threads
*/
- public void setProducerThreads(int threads) {
+ public void setProducerThreads(final int threads) {
setInt(PRODUCER_THREADS, threads);
}
-
+
/**
* Sets the number of threads for the bin pruner
* @param threads
*/
- public void setPrunerThreads(int threads) {
+ public void setPrunerThreads(final int threads) {
setInt(PRUNER_THREADS, threads);
}
-
+
/**
* Sets the number of threads for the Notification processor
* @param threads
*/
- public void setProcessorThreads(int threads) {
+ public void setProcessorThreads(final int threads) {
setInt(PROCESSOR_THREADS, threads);
}
-
+
/**
* @return name of the Fluo application
*/
public String getFluoAppName() {
return get(FLUO_APP_NAME);
}
-
+
/**
* @return name of the Fluo table
*/
public String getFluoTableName() {
- return get(FLUO_TABLE_NAME);
+ return get(FLUO_TABLE_NAME);
}
-
+
/**
* @return Kafka bootstrap servers
*/
public String getBootStrapServers() {
- return get(KAFKA_BOOTSTRAP_SERVERS);
+ return get(KAFKA_BOOTSTRAP_SERVERS);
}
-
+
/**
* @return notification topic
*/
public String getNotificationTopic() {
return get(NOTIFICATION_TOPIC, "notifications");
}
-
+
/**
* @return Kafka GroupId for the notificaton topic
*/
public String getNotificationGroupId() {
return get(NOTIFICATION_GROUP_ID, "group0");
}
-
+
/**
* @return Kafka ClientId for the notification topic
*/
public String getNotificationClientId() {
return get(NOTIFICATION_CLIENT_ID, "consumer0");
}
-
+
/**
* @return the number of threads for the coordinator
*/
public int getCoordinatorThreads() {
return getInt(COORDINATOR_THREADS, 1);
}
-
+
/**
* @return the number of threads for the exporter
*/
public int getExporterThreads() {
return getInt(EXPORTER_THREADS, 1);
}
-
+
/**
* @return the number of threads for the notification producer
*/
public int getProducerThreads() {
return getInt(PRODUCER_THREADS, 1);
}
-
+
/**
* @return the number of threads for the bin pruner
*/
public int getPrunerThreads() {
return getInt(PRUNER_THREADS, 1);
}
-
+
/**
* @return number of threads for the processor
*/
public int getProcessorThreads() {
return getInt(PROCESSOR_THREADS, 1);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java
index 771a4ab..fbc03f3 100644
--- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java
+++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java
@@ -18,10 +18,13 @@
*/
package org.apache.rya.periodic.notification.application;
+import java.io.File;
+import java.io.FileInputStream;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -33,6 +36,7 @@ import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.Snapshot;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.rya.indexing.pcj.fluo.app.util.FluoClientFactory;
@@ -51,6 +55,9 @@ import org.apache.rya.periodic.notification.registration.kafka.KafkaNotification
import org.apache.rya.periodic.notification.serialization.BindingSetSerDe;
import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
import org.openrdf.query.BindingSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Factory for creating a {@link PeriodicNotificationApplication}.
@@ -59,82 +66,88 @@ public class PeriodicNotificationApplicationFactory {
/**
* Create a PeriodicNotificationApplication.
- * @param props - Properties file that specifies the parameters needed to create the application
+ * @param conf - Configuration object that specifies the parameters needed to create the application
* @return PeriodicNotificationApplication to periodically poll Rya Fluo for new results
* @throws PeriodicApplicationException
*/
- public static PeriodicNotificationApplication getPeriodicApplication(Properties props) throws PeriodicApplicationException {
- PeriodicNotificationApplicationConfiguration conf = new PeriodicNotificationApplicationConfiguration(props);
- Properties kafkaProps = getKafkaProperties(conf);
+ public static PeriodicNotificationApplication getPeriodicApplication(final PeriodicNotificationApplicationConfiguration conf) throws PeriodicApplicationException {
+ final Properties kafkaConsumerProps = getKafkaConsumerProperties(conf);
+ final Properties kafkaProducerProps = getKafkaProducerProperties(conf);
- BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>();
- BlockingQueue<NodeBin> bins = new LinkedBlockingQueue<>();
- BlockingQueue<BindingSetRecord> bindingSets = new LinkedBlockingQueue<>();
+ final BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>();
+ final BlockingQueue<NodeBin> bins = new LinkedBlockingQueue<>();
+ final BlockingQueue<BindingSetRecord> bindingSets = new LinkedBlockingQueue<>();
FluoClient fluo = null;
try {
- PeriodicQueryResultStorage storage = getPeriodicQueryResultStorage(conf);
+ final PeriodicQueryResultStorage storage = getPeriodicQueryResultStorage(conf);
fluo = FluoClientFactory.getFluoClient(conf.getFluoAppName(), Optional.of(conf.getFluoTableName()), conf);
- NotificationCoordinatorExecutor coordinator = getCoordinator(conf.getCoordinatorThreads(), notifications);
+ final NotificationCoordinatorExecutor coordinator = getCoordinator(conf.getCoordinatorThreads(), notifications);
addRegisteredNotices(coordinator, fluo.newSnapshot());
- KafkaExporterExecutor exporter = getExporter(conf.getExporterThreads(), kafkaProps, bindingSets);
- PeriodicQueryPrunerExecutor pruner = getPruner(storage, fluo, conf.getPrunerThreads(), bins);
- NotificationProcessorExecutor processor = getProcessor(storage, notifications, bins, bindingSets, conf.getProcessorThreads());
- KafkaNotificationProvider provider = getProvider(conf.getProducerThreads(), conf.getNotificationTopic(), coordinator, kafkaProps);
+ final KafkaExporterExecutor exporter = getExporter(conf.getExporterThreads(), kafkaProducerProps, bindingSets);
+ final PeriodicQueryPrunerExecutor pruner = getPruner(storage, fluo, conf.getPrunerThreads(), bins);
+ final NotificationProcessorExecutor processor = getProcessor(storage, notifications, bins, bindingSets, conf.getProcessorThreads());
+ final KafkaNotificationProvider provider = getProvider(conf.getProducerThreads(), conf.getNotificationTopic(), coordinator, kafkaConsumerProps);
return PeriodicNotificationApplication.builder().setCoordinator(coordinator).setProvider(provider).setExporter(exporter)
.setProcessor(processor).setPruner(pruner).build();
} catch (AccumuloException | AccumuloSecurityException e) {
throw new PeriodicApplicationException(e.getMessage());
- }
+ }
}
-
- private static void addRegisteredNotices(NotificationCoordinatorExecutor coord, Snapshot sx) {
+
+ private static void addRegisteredNotices(final NotificationCoordinatorExecutor coord, final Snapshot sx) {
coord.start();
- PeriodicNotificationProvider provider = new PeriodicNotificationProvider();
+ final PeriodicNotificationProvider provider = new PeriodicNotificationProvider();
provider.processRegisteredNotifications(coord, sx);
}
- private static NotificationCoordinatorExecutor getCoordinator(int numThreads, BlockingQueue<TimestampedNotification> notifications) {
+ private static NotificationCoordinatorExecutor getCoordinator(final int numThreads, final BlockingQueue<TimestampedNotification> notifications) {
return new PeriodicNotificationCoordinatorExecutor(numThreads, notifications);
}
- private static KafkaExporterExecutor getExporter(int numThreads, Properties props, BlockingQueue<BindingSetRecord> bindingSets) {
- KafkaProducer<String, BindingSet> producer = new KafkaProducer<>(props, new StringSerializer(), new BindingSetSerDe());
+ private static KafkaExporterExecutor getExporter(final int numThreads, final Properties props, final BlockingQueue<BindingSetRecord> bindingSets) {
+ final KafkaProducer<String, BindingSet> producer = new KafkaProducer<>(props, new StringSerializer(), new BindingSetSerDe());
return new KafkaExporterExecutor(producer, numThreads, bindingSets);
}
- private static PeriodicQueryPrunerExecutor getPruner(PeriodicQueryResultStorage storage, FluoClient fluo, int numThreads,
- BlockingQueue<NodeBin> bins) {
+ private static PeriodicQueryPrunerExecutor getPruner(final PeriodicQueryResultStorage storage, final FluoClient fluo, final int numThreads,
+ final BlockingQueue<NodeBin> bins) {
return new PeriodicQueryPrunerExecutor(storage, fluo, numThreads, bins);
}
- private static NotificationProcessorExecutor getProcessor(PeriodicQueryResultStorage periodicStorage,
- BlockingQueue<TimestampedNotification> notifications, BlockingQueue<NodeBin> bins, BlockingQueue<BindingSetRecord> bindingSets,
- int numThreads) {
+ private static NotificationProcessorExecutor getProcessor(final PeriodicQueryResultStorage periodicStorage,
+ final BlockingQueue<TimestampedNotification> notifications, final BlockingQueue<NodeBin> bins, final BlockingQueue<BindingSetRecord> bindingSets,
+ final int numThreads) {
return new NotificationProcessorExecutor(periodicStorage, notifications, bins, bindingSets, numThreads);
}
- private static KafkaNotificationProvider getProvider(int numThreads, String topic, NotificationCoordinatorExecutor coord,
- Properties props) {
+ private static KafkaNotificationProvider getProvider(final int numThreads, final String topic, final NotificationCoordinatorExecutor coord,
+ final Properties props) {
return new KafkaNotificationProvider(topic, new StringDeserializer(), new CommandNotificationSerializer(), props, coord,
numThreads);
}
- private static PeriodicQueryResultStorage getPeriodicQueryResultStorage(PeriodicNotificationApplicationConfiguration conf)
+ private static PeriodicQueryResultStorage getPeriodicQueryResultStorage(final PeriodicNotificationApplicationConfiguration conf)
throws AccumuloException, AccumuloSecurityException {
- Instance instance = new ZooKeeperInstance(conf.getAccumuloInstance(), conf.getAccumuloZookeepers());
- Connector conn = instance.getConnector(conf.getAccumuloUser(), new PasswordToken(conf.getAccumuloPassword()));
- String ryaInstance = conf.getTablePrefix();
+ final Instance instance = new ZooKeeperInstance(conf.getAccumuloInstance(), conf.getAccumuloZookeepers());
+ final Connector conn = instance.getConnector(conf.getAccumuloUser(), new PasswordToken(conf.getAccumuloPassword()));
+ final String ryaInstance = conf.getTablePrefix();
return new AccumuloPeriodicQueryResultStorage(conn, ryaInstance);
}
-
- private static Properties getKafkaProperties(PeriodicNotificationApplicationConfiguration conf) {
- Properties kafkaProps = new Properties();
+
+ private static Properties getKafkaConsumerProperties(final PeriodicNotificationApplicationConfiguration conf) {
+ final Properties kafkaProps = new Properties();
kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.getBootStrapServers());
kafkaProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, conf.getNotificationClientId());
kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, conf.getNotificationGroupId());
kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ kafkaProps.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "30000"); // reduce this value to 30 seconds for the scenario where we subscribe before the topic exists.
return kafkaProps;
}
+ private static Properties getKafkaProducerProperties(final PeriodicNotificationApplicationConfiguration conf) {
+ final Properties kafkaProps = new Properties();
+ kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.getBootStrapServers());
+ return kafkaProps;
+ }
}