You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/11/18 21:22:01 UTC
[3/5] ignite git commit: IGNITE-1790 Implement Apache Camel streamer.
IGNITE-1790 Implement Apache Camel streamer.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c490de38
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c490de38
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c490de38
Branch: refs/heads/ignite-single-op-get
Commit: c490de38c7b841fe51fec1001368bda096e82100
Parents: 175b7f2
Author: Raul Kripalani <ra...@apache.org>
Authored: Wed Nov 18 18:03:23 2015 +0000
Committer: Raul Kripalani <ra...@apache.org>
Committed: Wed Nov 18 18:03:23 2015 +0000
----------------------------------------------------------------------
modules/camel/pom.xml | 102 +++++
.../ignite/stream/camel/CamelStreamer.java | 237 +++++++++++
.../stream/camel/IgniteCamelStreamerTest.java | 420 +++++++++++++++++++
.../camel/IgniteCamelStreamerTestSuite.java | 48 +++
.../src/test/resources/camel.test.properties | 18 +
.../org/apache/ignite/stream/StreamAdapter.java | 19 +-
pom.xml | 1 +
7 files changed, 835 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c490de38/modules/camel/pom.xml
----------------------------------------------------------------------
diff --git a/modules/camel/pom.xml b/modules/camel/pom.xml
new file mode 100644
index 0000000..60f0597
--- /dev/null
+++ b/modules/camel/pom.xml
@@ -0,0 +1,102 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-parent</artifactId>
+ <version>1</version>
+ <relativePath>../../parent</relativePath>
+ </parent>
+
+ <artifactId>ignite-camel</artifactId>
+ <version>1.5.0-SNAPSHOT</version>
+ <url>http://ignite.apache.org</url>
+
+ <properties>
+ <camel.version>2.16.0</camel.version>
+ <guava.version>18.0</guava.version>
+ <okhttp.version>2.5.0</okhttp.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-core</artifactId>
+ <version>${camel.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-log4j</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-spring</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-jetty</artifactId>
+ <version>${camel.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.squareup.okhttp</groupId>
+ <artifactId>okhttp</artifactId>
+ <version>${okhttp.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/ignite/blob/c490de38/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java
----------------------------------------------------------------------
diff --git a/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java b/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java
new file mode 100644
index 0000000..40ed6b3
--- /dev/null
+++ b/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.camel;
+
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.util.CamelContextHelper;
+import org.apache.camel.util.ServiceHelper;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.stream.StreamAdapter;
+import org.apache.ignite.stream.StreamMultipleTupleExtractor;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
+
+/**
+ * This streamer consumes messages from an Apache Camel consumer endpoint and feeds them into an Ignite data streamer.
+ *
+ * The only mandatory properties are {@link #endpointUri} and the appropriate stream tuple extractor (either {@link
+ * StreamSingleTupleExtractor} or {@link StreamMultipleTupleExtractor)}.
+ *
+ * The user can also provide a custom {@link CamelContext} in case they want to attach custom components, a {@link
+ * org.apache.camel.component.properties.PropertiesComponent}, set tracers, management strategies, etc.
+ *
+ * @see <a href="http://camel.apache.org">Apache Camel</a>
+ * @see <a href="http://camel.apache.org/components.html">Apache Camel components</a>
+ */
+public class CamelStreamer<K, V> extends StreamAdapter<Exchange, K, V> implements Processor {
+ /** Logger. */
+ private IgniteLogger log;
+
+ /** The Camel Context. */
+ private CamelContext camelCtx;
+
+ /** The endpoint URI to consume from. */
+ private String endpointUri;
+
+ /** Camel endpoint. */
+ private Endpoint endpoint;
+
+ /** Camel consumer. */
+ private Consumer consumer;
+
+ /** A {@link Processor} to generate the response. */
+ private Processor resProc;
+
+ /**
+ * Starts the streamer.
+ *
+ * @throws IgniteException In cases when failed to start the streamer.
+ */
+ public void start() throws IgniteException {
+ // Ensure that the endpoint URI is provided.
+ A.notNullOrEmpty(endpointUri, "endpoint URI must be provided");
+
+ // Check that one and only one tuple extractor is provided.
+ A.ensure(!(getSingleTupleExtractor() == null && getMultipleTupleExtractor() == null),
+ "tuple extractor missing");
+
+ // If a custom CamelContext is not provided, initialize one.
+ if (camelCtx == null)
+ camelCtx = new DefaultCamelContext();
+
+ // If the Camel Context is starting or started, reject this call to start.
+ if (camelCtx.getStatus() == ServiceStatus.Started || camelCtx.getStatus() == ServiceStatus.Starting)
+ throw new IgniteException("Failed to start Camel streamer (CamelContext already started or starting).");
+
+ log = getIgnite().log();
+
+ // Instantiate the Camel endpoint.
+ try {
+ endpoint = CamelContextHelper.getMandatoryEndpoint(camelCtx, endpointUri);
+ }
+ catch (Exception e) {
+ U.error(log, e);
+
+ throw new IgniteException("Failed to start Camel streamer [errMsg=" + e.getMessage() + ']');
+ }
+
+ // Create the Camel consumer.
+ try {
+ consumer = endpoint.createConsumer(this);
+ }
+ catch (Exception e) {
+ U.error(log, e);
+
+ throw new IgniteException("Failed to start Camel streamer [errMsg=" + e.getMessage() + ']');
+ }
+
+ // Start the Camel services.
+ try {
+ ServiceHelper.startServices(camelCtx, endpoint, consumer);
+ }
+ catch (Exception e) {
+ U.error(log, e);
+
+ try {
+ ServiceHelper.stopAndShutdownServices(camelCtx, endpoint, consumer);
+
+ consumer = null;
+ endpoint = null;
+ }
+ catch (Exception e1) {
+ throw new IgniteException("Failed to start Camel streamer; failed to stop the context, endpoint or " +
+ "consumer during rollback of failed initialization [errMsg=" + e.getMessage() + ", stopErrMsg=" +
+ e1.getMessage() + ']');
+ }
+
+ throw new IgniteException("Failed to start Camel streamer [errMsg=" + e.getMessage() + ']');
+ }
+
+ U.log(log, "Started Camel streamer consuming from endpoint URI: " + endpointUri);
+ }
+
+ /**
+ * Stops the streamer.
+ *
+ * @throws IgniteException In cases if failed to stop the streamer.
+ */
+ public void stop() throws IgniteException {
+ // If the Camel Context is stopping or stopped, reject this call to stop.
+ if (camelCtx.getStatus() == ServiceStatus.Stopped || camelCtx.getStatus() == ServiceStatus.Stopping)
+ throw new IgniteException("Failed to stop Camel streamer (CamelContext already stopped or stopping).");
+
+ // Stop Camel services.
+ try {
+ ServiceHelper.stopAndShutdownServices(camelCtx, endpoint, consumer);
+ }
+ catch (Exception e) {
+ throw new IgniteException("Failed to stop Camel streamer [errMsg=" + e.getMessage() + ']');
+ }
+
+ U.log(log, "Stopped Camel streamer, formerly consuming from endpoint URI: " + endpointUri);
+ }
+
+ /**
+ * Processes the incoming {@link Exchange} and adds the tuple(s) to the underlying streamer.
+ *
+ * @param exchange The Camel Exchange.
+ */
+ @Override public void process(Exchange exchange) throws Exception {
+ // Extract and insert the tuple(s).
+ if (getMultipleTupleExtractor() == null) {
+ Map.Entry<K, V> entry = getSingleTupleExtractor().extract(exchange);
+ getStreamer().addData(entry);
+ }
+ else {
+ Map<K, V> entries = getMultipleTupleExtractor().extract(exchange);
+ getStreamer().addData(entries);
+ }
+
+ // If the user has set a response processor, invoke it before finishing.
+ if (resProc != null)
+ resProc.process(exchange);
+ }
+
+ /**
+ * Gets the underlying {@link CamelContext}, whether created automatically by Ignite or the context specified by the
+ * user.
+ *
+ * @return The Camel Context.
+ */
+ public CamelContext getCamelContext() {
+ return camelCtx;
+ }
+
+ /**
+ * Explicitly sets the {@link CamelContext} to use.
+ *
+ * Doing so gives the user the opportunity to attach custom components, a {@link
+ * org.apache.camel.component.properties.PropertiesComponent}, set tracers, management strategies, etc.
+ *
+ * @param camelCtx The Camel Context to use. In most cases, an instance of {@link DefaultCamelContext}.
+ */
+ public void setCamelContext(CamelContext camelCtx) {
+ this.camelCtx = camelCtx;
+ }
+
+ /**
+ * Gets the endpoint URI from which to consume.
+ *
+ * @return The endpoint URI.
+ */
+ public String getEndpointUri() {
+ return endpointUri;
+ }
+
+ /**
+ * Sets the endpoint URI from which to consume. <b>Mandatory.</b>
+ *
+ * @param endpointUri The endpoint URI.
+ */
+ public void setEndpointUri(String endpointUri) {
+ this.endpointUri = endpointUri;
+ }
+
+ /**
+ * Gets the {@link Processor} used to generate the response.
+ *
+ * @return The {@link Processor}.
+ */
+ public Processor getResponseProcessor() {
+ return resProc;
+ }
+
+ /**
+ * Sets the {@link Processor} used to generate the response.
+ *
+ * @param resProc The {@link Processor}.
+ */
+ public void setResponseProcessor(Processor resProc) {
+ this.resProc = resProc;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c490de38/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java b/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java
new file mode 100644
index 0000000..4795dff
--- /dev/null
+++ b/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.camel;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.component.properties.PropertiesComponent;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.LifecycleStrategySupport;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.internal.util.lang.GridMapEntry;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.stream.StreamMultipleTupleExtractor;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.squareup.okhttp.MediaType;
+import com.squareup.okhttp.OkHttpClient;
+import com.squareup.okhttp.Request;
+import com.squareup.okhttp.RequestBody;
+import com.squareup.okhttp.Response;
+
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
+
+/**
+ * Test class for {@link CamelStreamer}.
+ */
+public class IgniteCamelStreamerTest extends GridCommonAbstractTest {
+ /** text/plain media type. */
+ private static final MediaType TEXT_PLAIN = MediaType.parse("text/plain;charset=utf-8");
+
+ /** The test data. */
+ private static final Map<Integer, String> TEST_DATA = new HashMap<>();
+
+ /** The Camel streamer currently under test. */
+ private CamelStreamer<Integer, String> streamer;
+
+ /** The Ignite data streamer. */
+ private IgniteDataStreamer<Integer, String> dataStreamer;
+
+ /** URL where the REST service will be exposed. */
+ private String url;
+
+ /** The UUID of the currently active remote listener. */
+ private UUID remoteLsnr;
+
+ /** The OkHttpClient. */
+ private OkHttpClient httpClient = new OkHttpClient();
+
+ // Initialize the test data.
+ static {
+ for (int i = 0; i < 100; i++)
+ TEST_DATA.put(i, "v" + i);
+ }
+
+ /** Constructor. */
+ public IgniteCamelStreamerTest() {
+ super(true);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override public void beforeTest() throws Exception {
+ grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration());
+
+ // find an available local port
+ try (ServerSocket ss = new ServerSocket(0)) {
+ int port = ss.getLocalPort();
+
+ url = "http://localhost:" + port + "/ignite";
+ }
+
+ // create Camel streamer
+ dataStreamer = grid().dataStreamer(null);
+ streamer = createCamelStreamer(dataStreamer);
+ }
+
+ @Override public void afterTest() throws Exception {
+ try {
+ streamer.stop();
+ }
+ catch (Exception e) {
+ // ignore if already stopped
+ }
+
+ dataStreamer.close();
+
+ grid().cache(null).clear();
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void testSendOneEntryPerMessage() throws Exception {
+ streamer.setSingleTupleExtractor(singleTupleExtractor());
+
+ // Subscribe to cache PUT events.
+ CountDownLatch latch = subscribeToPutEvents(50);
+
+ // Action time.
+ streamer.start();
+
+ // Send messages.
+ sendMessages(0, 50, false);
+
+ // Assertions.
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertCacheEntriesLoaded(50);
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void testMultipleEntriesInOneMessage() throws Exception {
+ streamer.setMultipleTupleExtractor(multipleTupleExtractor());
+
+ // Subscribe to cache PUT events.
+ CountDownLatch latch = subscribeToPutEvents(50);
+
+ // Action time.
+ streamer.start();
+
+ // Send messages.
+ sendMessages(0, 50, true);
+
+ // Assertions.
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertCacheEntriesLoaded(50);
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void testResponseProcessorIsCalled() throws Exception {
+ streamer.setSingleTupleExtractor(singleTupleExtractor());
+ streamer.setResponseProcessor(new Processor() {
+ @Override public void process(Exchange exchange) throws Exception {
+ exchange.getOut().setBody("Foo bar");
+ }
+ });
+
+ // Subscribe to cache PUT events.
+ CountDownLatch latch = subscribeToPutEvents(50);
+
+ // Action time.
+ streamer.start();
+
+ // Send messages.
+ List<String> responses = sendMessages(0, 50, false);
+
+ for (String r : responses)
+ assertEquals("Foo bar", r);
+
+ // Assertions.
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertCacheEntriesLoaded(50);
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void testUserSpecifiedCamelContext() throws Exception {
+ final AtomicInteger cnt = new AtomicInteger();
+
+ // Create a CamelContext with a probe that'll help us know if it has been used.
+ CamelContext context = new DefaultCamelContext();
+ context.setTracing(true);
+ context.addLifecycleStrategy(new LifecycleStrategySupport() {
+ @Override public void onEndpointAdd(Endpoint endpoint) {
+ cnt.incrementAndGet();
+ }
+ });
+
+ streamer.setSingleTupleExtractor(singleTupleExtractor());
+ streamer.setCamelContext(context);
+
+ // Subscribe to cache PUT events.
+ CountDownLatch latch = subscribeToPutEvents(50);
+
+ // Action time.
+ streamer.start();
+
+ // Send messages.
+ sendMessages(0, 50, false);
+
+ // Assertions.
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertCacheEntriesLoaded(50);
+ assertTrue(cnt.get() > 0);
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void testUserSpecifiedCamelContextWithPropertyPlaceholders() throws Exception {
+ // Create a CamelContext with a custom property placeholder.
+ CamelContext context = new DefaultCamelContext();
+
+ PropertiesComponent pc = new PropertiesComponent("camel.test.properties");
+
+ context.addComponent("properties", pc);
+
+ // Replace the context path in the test URL with the property placeholder.
+ url = url.replaceAll("/ignite", "{{test.contextPath}}");
+
+ // Recreate the Camel streamer with the new URL.
+ streamer = createCamelStreamer(dataStreamer);
+
+ streamer.setSingleTupleExtractor(singleTupleExtractor());
+ streamer.setCamelContext(context);
+
+ // Subscribe to cache PUT events.
+ CountDownLatch latch = subscribeToPutEvents(50);
+
+ // Action time.
+ streamer.start();
+
+ // Before sending the messages, get the actual URL after the property placeholder was resolved,
+ // stripping the jetty: prefix from it.
+ url = streamer.getCamelContext().getEndpoints().iterator().next().getEndpointUri().replaceAll("jetty:", "");
+
+ // Send messages.
+ sendMessages(0, 50, false);
+
+ // Assertions.
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertCacheEntriesLoaded(50);
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void testInvalidEndpointUri() throws Exception {
+ streamer.setSingleTupleExtractor(singleTupleExtractor());
+ streamer.setEndpointUri("abc");
+
+ // Action time.
+ try {
+ streamer.start();
+ fail("Streamer started; should have failed.");
+ }
+ catch (IgniteException e) {
+ assertTrue(streamer.getCamelContext().getStatus() == ServiceStatus.Stopped);
+ assertTrue(streamer.getCamelContext().getEndpointRegistry().size() == 0);
+ }
+ }
+
+ /**
+ * Creates a Camel streamer.
+ */
+ private CamelStreamer<Integer, String> createCamelStreamer(IgniteDataStreamer<Integer, String> dataStreamer) {
+ CamelStreamer<Integer, String> streamer = new CamelStreamer<>();
+
+ streamer.setIgnite(grid());
+ streamer.setStreamer(dataStreamer);
+ streamer.setEndpointUri("jetty:" + url);
+
+ dataStreamer.allowOverwrite(true);
+ dataStreamer.autoFlushFrequency(1);
+
+ return streamer;
+ }
+
+ /**
+ * @throws IOException
+ * @return HTTP response payloads.
+ */
+ private List<String> sendMessages(int fromIdx, int cnt, boolean singleMessage) throws IOException {
+ List<String> responses = Lists.newArrayList();
+
+ if (singleMessage) {
+ StringBuilder sb = new StringBuilder();
+
+ for (int i = fromIdx; i < fromIdx + cnt; i++)
+ sb.append(i).append(",").append(TEST_DATA.get(i)).append("\n");
+
+ Request request = new Request.Builder()
+ .url(url)
+ .post(RequestBody.create(TEXT_PLAIN, sb.toString()))
+ .build();
+
+ Response response = httpClient.newCall(request).execute();
+
+ responses.add(response.body().string());
+ }
+ else {
+ for (int i = fromIdx; i < fromIdx + cnt; i++) {
+ String payload = i + "," + TEST_DATA.get(i);
+
+ Request request = new Request.Builder()
+ .url(url)
+ .post(RequestBody.create(TEXT_PLAIN, payload))
+ .build();
+
+ Response response = httpClient.newCall(request).execute();
+
+ responses.add(response.body().string());
+ }
+ }
+
+ return responses;
+ }
+
+ /**
+ * Returns a {@link StreamSingleTupleExtractor} for testing.
+ */
+ private static StreamSingleTupleExtractor<Exchange, Integer, String> singleTupleExtractor() {
+ return new StreamSingleTupleExtractor<Exchange, Integer, String>() {
+ @Override public Map.Entry<Integer, String> extract(Exchange exchange) {
+ List<String> s = Splitter.on(",").splitToList(exchange.getIn().getBody(String.class));
+
+ return new GridMapEntry<>(Integer.parseInt(s.get(0)), s.get(1));
+ }
+ };
+ }
+
+ /**
+ * Returns a {@link StreamMultipleTupleExtractor} for testing.
+ */
+ private static StreamMultipleTupleExtractor<Exchange, Integer, String> multipleTupleExtractor() {
+ return new StreamMultipleTupleExtractor<Exchange, Integer, String>() {
+ @Override public Map<Integer, String> extract(Exchange exchange) {
+ final Map<String, String> map = Splitter.on("\n")
+ .omitEmptyStrings()
+ .withKeyValueSeparator(",")
+ .split(exchange.getIn().getBody(String.class));
+
+ final Map<Integer, String> answer = new HashMap<>();
+
+ F.forEach(map.keySet(), new IgniteInClosure<String>() {
+ @Override public void apply(String s) {
+ answer.put(Integer.parseInt(s), map.get(s));
+ }
+ });
+
+ return answer;
+ }
+ };
+ }
+
+ /**
+ * Subscribe to cache put events.
+ */
+ private CountDownLatch subscribeToPutEvents(int expect) {
+ Ignite ignite = grid();
+
+ // Listen to cache PUT events and expect as many as messages as test data items
+ final CountDownLatch latch = new CountDownLatch(expect);
+ @SuppressWarnings("serial") IgniteBiPredicate<UUID, CacheEvent> callback =
+ new IgniteBiPredicate<UUID, CacheEvent>() {
+ @Override public boolean apply(UUID uuid, CacheEvent evt) {
+ latch.countDown();
+
+ return true;
+ }
+ };
+
+ remoteLsnr = ignite.events(ignite.cluster().forCacheNodes(null))
+ .remoteListen(callback, null, EVT_CACHE_OBJECT_PUT);
+
+ return latch;
+ }
+
+ /**
+ * Assert a given number of cache entries have been loaded.
+ */
+ private void assertCacheEntriesLoaded(int cnt) {
+ // get the cache and check that the entries are present
+ IgniteCache<Integer, String> cache = grid().cache(null);
+
+ // for each key from 0 to count from the TEST_DATA (ordered by key), check that the entry is present in cache
+ for (Integer key : new ArrayList<>(new TreeSet<>(TEST_DATA.keySet())).subList(0, cnt))
+ assertEquals(TEST_DATA.get(key), cache.get(key));
+
+ // assert that the cache exactly the specified amount of elements
+ assertEquals(cnt, cache.size(CachePeekMode.ALL));
+
+ // remove the event listener
+ grid().events(grid().cluster().forCacheNodes(null)).stopRemoteListen(remoteLsnr);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c490de38/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTestSuite.java b/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTestSuite.java
new file mode 100644
index 0000000..266c9cf
--- /dev/null
+++ b/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTestSuite.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.camel;
+
+import java.util.Set;
+
+import junit.framework.TestSuite;
+
+/**
+ * Camel streamer tests.
+ */
+public class IgniteCamelStreamerTestSuite extends TestSuite {
+ /**
+ * @return {@link IgniteCamelStreamerTest} test suite.
+ * @throws Exception Thrown in case of the failure.
+ */
+ public static TestSuite suite() throws Exception {
+ return suite(null);
+ }
+
+ /**
+ * @param ignoredTests
+ * @return Test suite.
+ * @throws Exception Thrown in case of the failure.
+ */
+ public static TestSuite suite(Set<Class> ignoredTests) throws Exception {
+ TestSuite suite = new TestSuite("IgniteCamelStreamer Test Suite");
+
+ suite.addTestSuite(IgniteCamelStreamerTest.class);
+
+ return suite;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c490de38/modules/camel/src/test/resources/camel.test.properties
----------------------------------------------------------------------
diff --git a/modules/camel/src/test/resources/camel.test.properties b/modules/camel/src/test/resources/camel.test.properties
new file mode 100644
index 0000000..30459be
--- /dev/null
+++ b/modules/camel/src/test/resources/camel.test.properties
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+test.contextPath = /ignite-properties
http://git-wip-us.apache.org/repos/asf/ignite/blob/c490de38/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
index 2cb7db7..afc1530 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
@@ -37,7 +37,6 @@ import org.apache.ignite.IgniteDataStreamer;
* </ol>
*/
public abstract class StreamAdapter<T, K, V> {
-
/** Tuple extractor extracting a single tuple from an event */
private StreamSingleTupleExtractor<T, K, V> singleTupleExtractor;
@@ -99,9 +98,9 @@ public abstract class StreamAdapter<T, K, V> {
*/
@Deprecated
public StreamTupleExtractor<T, K, V> getTupleExtractor() {
- if (singleTupleExtractor instanceof StreamTupleExtractor) {
+ if (singleTupleExtractor instanceof StreamTupleExtractor)
return (StreamTupleExtractor) singleTupleExtractor;
- }
+
throw new IllegalArgumentException("This method is deprecated and only relevant if using an old " +
"StreamTupleExtractor; use getSingleTupleExtractor instead");
}
@@ -112,9 +111,9 @@ public abstract class StreamAdapter<T, K, V> {
*/
@Deprecated
public void setTupleExtractor(StreamTupleExtractor<T, K, V> extractor) {
- if (multipleTupleExtractor != null) {
+ if (multipleTupleExtractor != null)
throw new IllegalArgumentException("Multiple tuple extractor already set; cannot set both types at once.");
- }
+
this.singleTupleExtractor = extractor;
}
@@ -129,9 +128,9 @@ public abstract class StreamAdapter<T, K, V> {
* @param singleTupleExtractor Extractor for key-value tuples from messages.
*/
public void setSingleTupleExtractor(StreamSingleTupleExtractor<T, K, V> singleTupleExtractor) {
- if (multipleTupleExtractor != null) {
+ if (multipleTupleExtractor != null)
throw new IllegalArgumentException("Multiple tuple extractor already set; cannot set both types at once.");
- }
+
this.singleTupleExtractor = singleTupleExtractor;
}
@@ -146,9 +145,9 @@ public abstract class StreamAdapter<T, K, V> {
* @param multipleTupleExtractor Extractor for 1:n tuple extraction.
*/
public void setMultipleTupleExtractor(StreamMultipleTupleExtractor<T, K, V> multipleTupleExtractor) {
- if (singleTupleExtractor != null) {
+ if (singleTupleExtractor != null)
throw new IllegalArgumentException("Single tuple extractor already set; cannot set both types at once.");
- }
+
this.multipleTupleExtractor = multipleTupleExtractor;
}
@@ -188,4 +187,4 @@ public abstract class StreamAdapter<T, K, V> {
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c490de38/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c40b551..b9c51b2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -78,6 +78,7 @@
<module>modules/jms11</module>
<module>modules/mqtt</module>
<module>modules/zookeeper</module>
+ <module>modules/camel</module>
<module>modules/platform</module>
</modules>