You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sa...@apache.org on 2020/08/13 04:03:11 UTC
[ignite] branch master updated: IGNITE-12363 Migrate Camel module
to ignite-extensions - Fixes #8132.
This is an automated email from the ASF dual-hosted git repository.
samaitra pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 674841e IGNITE-12363 Migrate Camel module to ignite-extensions - Fixes #8132.
674841e is described below
commit 674841e14bb6d872910d2e754764119d10400846
Author: samaitra <sa...@gmail.com>
AuthorDate: Wed Aug 12 23:02:46 2020 -0500
IGNITE-12363 Migrate Camel module to ignite-extensions - Fixes #8132.
Signed-off-by: samaitra <sa...@gmail.com>
---
assembly/libs/README.txt | 1 -
modules/camel/README.txt | 34 --
modules/camel/licenses/apache-2.0.txt | 202 ----------
modules/camel/pom.xml | 116 ------
.../apache/ignite/stream/camel/CamelStreamer.java | 237 -----------
.../apache/ignite/stream/camel/package-info.java | 22 --
.../stream/camel/IgniteCamelStreamerTest.java | 432 ---------------------
.../stream/camel/IgniteCamelStreamerTestSuite.java | 29 --
.../apache/ignite/stream/camel/package-info.java | 22 --
.../camel/src/test/resources/camel.test.properties | 18 -
pom.xml | 1 -
11 files changed, 1114 deletions(-)
diff --git a/assembly/libs/README.txt b/assembly/libs/README.txt
index a551f18..8bb3bad 100644
--- a/assembly/libs/README.txt
+++ b/assembly/libs/README.txt
@@ -73,7 +73,6 @@ All optional modules can be imported just like the core module, but with differe
The following modules are available:
- ignite-aop (for AOP-based grid-enabling)
- ignite-aws (for seemless cluster discovery on AWS S3)
-- ignite-camel (for Apache Camel integration)
- ignite-cassandra (for Apache Cassandra integration)
- ignite-cloud (for Apache JClouds integration)
- ignite-gce (for automatic cluster discovery on Google Compute Engine)
diff --git a/modules/camel/README.txt b/modules/camel/README.txt
deleted file mode 100644
index ca119ae..0000000
--- a/modules/camel/README.txt
+++ /dev/null
@@ -1,34 +0,0 @@
-Apache Ignite Camel Module
---------------------------
-
-Apache Ignite Camel provides a streamer to consume cache tuples from a Camel endpoint such as
-HTTP, TCP, File, FTP, AMQP, SNMP, databases, etc. For more information on available components,
-refer to http://camel.apache.org/components.html.
-
-To enable the Camel module when starting a standalone node, move 'optional/ignite-camel' folder
-to 'libs' folder before running 'ignite.{sh|bat}' script. The content of the module folder will
-be added to classpath in this case.
-
-Importing the Camel module in a Maven project
----------------------------------------------
-
-If you are using Maven to manage dependencies of your project, you can add the Camel module
-dependency like this (replace '${ignite.version}' with actual Ignite version you are
-interested in):
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
- http://maven.apache.org/xsd/maven-4.0.0.xsd">
- ...
- <dependencies>
- ...
- <dependency>
- <groupId>org.apache.ignite</groupId>
- <artifactId>ignite-camel</artifactId>
- <version>${ignite.version}</version>
- </dependency>
- ...
- </dependencies>
- ...
-</project>
diff --git a/modules/camel/licenses/apache-2.0.txt b/modules/camel/licenses/apache-2.0.txt
deleted file mode 100644
index d645695..0000000
--- a/modules/camel/licenses/apache-2.0.txt
+++ /dev/null
@@ -1,202 +0,0 @@
-
- Apache License
- Version 2.0, January 2004
- http://www.apache.org/licenses/
-
- TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
-
- 1. Definitions.
-
- "License" shall mean the terms and conditions for use, reproduction,
- and distribution as defined by Sections 1 through 9 of this document.
-
- "Licensor" shall mean the copyright owner or entity authorized by
- the copyright owner that is granting the License.
-
- "Legal Entity" shall mean the union of the acting entity and all
- other entities that control, are controlled by, or are under common
- control with that entity. For the purposes of this definition,
- "control" means (i) the power, direct or indirect, to cause the
- direction or management of such entity, whether by contract or
- otherwise, or (ii) ownership of fifty percent (50%) or more of the
- outstanding shares, or (iii) beneficial ownership of such entity.
-
- "You" (or "Your") shall mean an individual or Legal Entity
- exercising permissions granted by this License.
-
- "Source" form shall mean the preferred form for making modifications,
- including but not limited to software source code, documentation
- source, and configuration files.
-
- "Object" form shall mean any form resulting from mechanical
- transformation or translation of a Source form, including but
- not limited to compiled object code, generated documentation,
- and conversions to other media types.
-
- "Work" shall mean the work of authorship, whether in Source or
- Object form, made available under the License, as indicated by a
- copyright notice that is included in or attached to the work
- (an example is provided in the Appendix below).
-
- "Derivative Works" shall mean any work, whether in Source or Object
- form, that is based on (or derived from) the Work and for which the
- editorial revisions, annotations, elaborations, or other modifications
- represent, as a whole, an original work of authorship. For the purposes
- of this License, Derivative Works shall not include works that remain
- separable from, or merely link (or bind by name) to the interfaces of,
- the Work and Derivative Works thereof.
-
- "Contribution" shall mean any work of authorship, including
- the original version of the Work and any modifications or additions
- to that Work or Derivative Works thereof, that is intentionally
- submitted to Licensor for inclusion in the Work by the copyright owner
- or by an individual or Legal Entity authorized to submit on behalf of
- the copyright owner. For the purposes of this definition, "submitted"
- means any form of electronic, verbal, or written communication sent
- to the Licensor or its representatives, including but not limited to
- communication on electronic mailing lists, source code control systems,
- and issue tracking systems that are managed by, or on behalf of, the
- Licensor for the purpose of discussing and improving the Work, but
- excluding communication that is conspicuously marked or otherwise
- designated in writing by the copyright owner as "Not a Contribution."
-
- "Contributor" shall mean Licensor and any individual or Legal Entity
- on behalf of whom a Contribution has been received by Licensor and
- subsequently incorporated within the Work.
-
- 2. Grant of Copyright License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- copyright license to reproduce, prepare Derivative Works of,
- publicly display, publicly perform, sublicense, and distribute the
- Work and such Derivative Works in Source or Object form.
-
- 3. Grant of Patent License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- (except as stated in this section) patent license to make, have made,
- use, offer to sell, sell, import, and otherwise transfer the Work,
- where such license applies only to those patent claims licensable
- by such Contributor that are necessarily infringed by their
- Contribution(s) alone or by combination of their Contribution(s)
- with the Work to which such Contribution(s) was submitted. If You
- institute patent litigation against any entity (including a
- cross-claim or counterclaim in a lawsuit) alleging that the Work
- or a Contribution incorporated within the Work constitutes direct
- or contributory patent infringement, then any patent licenses
- granted to You under this License for that Work shall terminate
- as of the date such litigation is filed.
-
- 4. Redistribution. You may reproduce and distribute copies of the
- Work or Derivative Works thereof in any medium, with or without
- modifications, and in Source or Object form, provided that You
- meet the following conditions:
-
- (a) You must give any other recipients of the Work or
- Derivative Works a copy of this License; and
-
- (b) You must cause any modified files to carry prominent notices
- stating that You changed the files; and
-
- (c) You must retain, in the Source form of any Derivative Works
- that You distribute, all copyright, patent, trademark, and
- attribution notices from the Source form of the Work,
- excluding those notices that do not pertain to any part of
- the Derivative Works; and
-
- (d) If the Work includes a "NOTICE" text file as part of its
- distribution, then any Derivative Works that You distribute must
- include a readable copy of the attribution notices contained
- within such NOTICE file, excluding those notices that do not
- pertain to any part of the Derivative Works, in at least one
- of the following places: within a NOTICE text file distributed
- as part of the Derivative Works; within the Source form or
- documentation, if provided along with the Derivative Works; or,
- within a display generated by the Derivative Works, if and
- wherever such third-party notices normally appear. The contents
- of the NOTICE file are for informational purposes only and
- do not modify the License. You may add Your own attribution
- notices within Derivative Works that You distribute, alongside
- or as an addendum to the NOTICE text from the Work, provided
- that such additional attribution notices cannot be construed
- as modifying the License.
-
- You may add Your own copyright statement to Your modifications and
- may provide additional or different license terms and conditions
- for use, reproduction, or distribution of Your modifications, or
- for any such Derivative Works as a whole, provided Your use,
- reproduction, and distribution of the Work otherwise complies with
- the conditions stated in this License.
-
- 5. Submission of Contributions. Unless You explicitly state otherwise,
- any Contribution intentionally submitted for inclusion in the Work
- by You to the Licensor shall be under the terms and conditions of
- this License, without any additional terms or conditions.
- Notwithstanding the above, nothing herein shall supersede or modify
- the terms of any separate license agreement you may have executed
- with Licensor regarding such Contributions.
-
- 6. Trademarks. This License does not grant permission to use the trade
- names, trademarks, service marks, or product names of the Licensor,
- except as required for reasonable and customary use in describing the
- origin of the Work and reproducing the content of the NOTICE file.
-
- 7. Disclaimer of Warranty. Unless required by applicable law or
- agreed to in writing, Licensor provides the Work (and each
- Contributor provides its Contributions) on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- implied, including, without limitation, any warranties or conditions
- of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
- PARTICULAR PURPOSE. You are solely responsible for determining the
- appropriateness of using or redistributing the Work and assume any
- risks associated with Your exercise of permissions under this License.
-
- 8. Limitation of Liability. In no event and under no legal theory,
- whether in tort (including negligence), contract, or otherwise,
- unless required by applicable law (such as deliberate and grossly
- negligent acts) or agreed to in writing, shall any Contributor be
- liable to You for damages, including any direct, indirect, special,
- incidental, or consequential damages of any character arising as a
- result of this License or out of the use or inability to use the
- Work (including but not limited to damages for loss of goodwill,
- work stoppage, computer failure or malfunction, or any and all
- other commercial damages or losses), even if such Contributor
- has been advised of the possibility of such damages.
-
- 9. Accepting Warranty or Additional Liability. While redistributing
- the Work or Derivative Works thereof, You may choose to offer,
- and charge a fee for, acceptance of support, warranty, indemnity,
- or other liability obligations and/or rights consistent with this
- License. However, in accepting such obligations, You may act only
- on Your own behalf and on Your sole responsibility, not on behalf
- of any other Contributor, and only if You agree to indemnify,
- defend, and hold each Contributor harmless for any liability
- incurred by, or claims asserted against, such Contributor by reason
- of your accepting any such warranty or additional liability.
-
- END OF TERMS AND CONDITIONS
-
- APPENDIX: How to apply the Apache License to your work.
-
- To apply the Apache License to your work, attach the following
- boilerplate notice, with the fields enclosed by brackets "[]"
- replaced with your own identifying information. (Don't include
- the brackets!) The text should be enclosed in the appropriate
- comment syntax for the file format. We also recommend that a
- file or class name and description of purpose be included on the
- same "printed page" as the copyright notice for easier
- identification within third-party archives.
-
- Copyright [yyyy] [name of copyright owner]
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
diff --git a/modules/camel/pom.xml b/modules/camel/pom.xml
deleted file mode 100644
index 158a7af..0000000
--- a/modules/camel/pom.xml
+++ /dev/null
@@ -1,116 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<!--
- POM file.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.ignite</groupId>
- <artifactId>ignite-parent</artifactId>
- <version>1</version>
- <relativePath>../../parent</relativePath>
- </parent>
-
- <artifactId>ignite-camel</artifactId>
- <version>2.10.0-SNAPSHOT</version>
- <url>http://ignite.apache.org</url>
-
- <properties>
- <okhttp.version>2.5.0</okhttp.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.ignite</groupId>
- <artifactId>ignite-core</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-core</artifactId>
- <version>${camel.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.ignite</groupId>
- <artifactId>ignite-log4j</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.ignite</groupId>
- <artifactId>ignite-spring</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.ignite</groupId>
- <artifactId>ignite-core</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.ignite</groupId>
- <artifactId>ignite-tools</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>${guava.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-jetty</artifactId>
- <version>${camel.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>com.squareup.okhttp</groupId>
- <artifactId>okhttp</artifactId>
- <version>${okhttp.version}</version>
- <scope>test</scope>
- </dependency>
-
- </dependencies>
-
- <build>
- <plugins>
- <!-- Generate the OSGi MANIFEST.MF for this bundle. -->
- <plugin>
- <groupId>org.apache.felix</groupId>
- <artifactId>maven-bundle-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
-
-</project>
diff --git a/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java b/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java
deleted file mode 100644
index 369d2a6..0000000
--- a/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream.camel;
-
-import java.util.Map;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.Consumer;
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.ServiceStatus;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.util.CamelContextHelper;
-import org.apache.camel.util.ServiceHelper;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.stream.StreamAdapter;
-import org.apache.ignite.stream.StreamMultipleTupleExtractor;
-import org.apache.ignite.stream.StreamSingleTupleExtractor;
-
-/**
- * This streamer consumes messages from an Apache Camel consumer endpoint and feeds them into an Ignite data streamer.
- *
- * The only mandatory properties are {@link #endpointUri} and the appropriate stream tuple extractor (either {@link
- * StreamSingleTupleExtractor} or {@link StreamMultipleTupleExtractor}.
- *
- * The user can also provide a custom {@link CamelContext} in case they want to attach custom components, a {@link
- * org.apache.camel.component.properties.PropertiesComponent}, set tracers, management strategies, etc.
- *
- * @see <a href="http://camel.apache.org">Apache Camel</a>
- * @see <a href="http://camel.apache.org/components.html">Apache Camel components</a>
- */
-public class CamelStreamer<K, V> extends StreamAdapter<Exchange, K, V> implements Processor {
- /** Logger. */
- private IgniteLogger log;
-
- /** The Camel Context. */
- private CamelContext camelCtx;
-
- /** The endpoint URI to consume from. */
- private String endpointUri;
-
- /** Camel endpoint. */
- private Endpoint endpoint;
-
- /** Camel consumer. */
- private Consumer consumer;
-
- /** A {@link Processor} to generate the response. */
- private Processor resProc;
-
- /**
- * Starts the streamer.
- *
- * @throws IgniteException In cases when failed to start the streamer.
- */
- public void start() throws IgniteException {
- // Ensure that the endpoint URI is provided.
- A.notNullOrEmpty(endpointUri, "endpoint URI must be provided");
-
- // Check that one and only one tuple extractor is provided.
- A.ensure(!(getSingleTupleExtractor() == null && getMultipleTupleExtractor() == null),
- "tuple extractor missing");
-
- // If a custom CamelContext is not provided, initialize one.
- if (camelCtx == null)
- camelCtx = new DefaultCamelContext();
-
- // If the Camel Context is starting or started, reject this call to start.
- if (camelCtx.getStatus() == ServiceStatus.Started || camelCtx.getStatus() == ServiceStatus.Starting)
- throw new IgniteException("Failed to start Camel streamer (CamelContext already started or starting).");
-
- log = getIgnite().log();
-
- // Instantiate the Camel endpoint.
- try {
- endpoint = CamelContextHelper.getMandatoryEndpoint(camelCtx, endpointUri);
- }
- catch (Exception e) {
- U.error(log, e);
-
- throw new IgniteException("Failed to start Camel streamer [errMsg=" + e.getMessage() + ']');
- }
-
- // Create the Camel consumer.
- try {
- consumer = endpoint.createConsumer(this);
- }
- catch (Exception e) {
- U.error(log, e);
-
- throw new IgniteException("Failed to start Camel streamer [errMsg=" + e.getMessage() + ']');
- }
-
- // Start the Camel services.
- try {
- ServiceHelper.startServices(camelCtx, endpoint, consumer);
- }
- catch (Exception e) {
- U.error(log, e);
-
- try {
- ServiceHelper.stopAndShutdownServices(camelCtx, endpoint, consumer);
-
- consumer = null;
- endpoint = null;
- }
- catch (Exception e1) {
- throw new IgniteException("Failed to start Camel streamer; failed to stop the context, endpoint or " +
- "consumer during rollback of failed initialization [errMsg=" + e.getMessage() + ", stopErrMsg=" +
- e1.getMessage() + ']');
- }
-
- throw new IgniteException("Failed to start Camel streamer [errMsg=" + e.getMessage() + ']');
- }
-
- U.log(log, "Started Camel streamer consuming from endpoint URI: " + endpointUri);
- }
-
- /**
- * Stops the streamer.
- *
- * @throws IgniteException In cases if failed to stop the streamer.
- */
- public void stop() throws IgniteException {
- // If the Camel Context is stopping or stopped, reject this call to stop.
- if (camelCtx.getStatus() == ServiceStatus.Stopped || camelCtx.getStatus() == ServiceStatus.Stopping)
- throw new IgniteException("Failed to stop Camel streamer (CamelContext already stopped or stopping).");
-
- // Stop Camel services.
- try {
- ServiceHelper.stopAndShutdownServices(camelCtx, endpoint, consumer);
- }
- catch (Exception e) {
- throw new IgniteException("Failed to stop Camel streamer [errMsg=" + e.getMessage() + ']');
- }
-
- U.log(log, "Stopped Camel streamer, formerly consuming from endpoint URI: " + endpointUri);
- }
-
- /**
- * Processes the incoming {@link Exchange} and adds the tuple(s) to the underlying streamer.
- *
- * @param exchange The Camel Exchange.
- */
- @Override public void process(Exchange exchange) throws Exception {
- // Extract and insert the tuple(s).
- if (getMultipleTupleExtractor() == null) {
- Map.Entry<K, V> entry = getSingleTupleExtractor().extract(exchange);
- getStreamer().addData(entry);
- }
- else {
- Map<K, V> entries = getMultipleTupleExtractor().extract(exchange);
- getStreamer().addData(entries);
- }
-
- // If the user has set a response processor, invoke it before finishing.
- if (resProc != null)
- resProc.process(exchange);
- }
-
- /**
- * Gets the underlying {@link CamelContext}, whether created automatically by Ignite or the context specified by the
- * user.
- *
- * @return The Camel Context.
- */
- public CamelContext getCamelContext() {
- return camelCtx;
- }
-
- /**
- * Explicitly sets the {@link CamelContext} to use.
- *
- * Doing so gives the user the opportunity to attach custom components, a {@link
- * org.apache.camel.component.properties.PropertiesComponent}, set tracers, management strategies, etc.
- *
- * @param camelCtx The Camel Context to use. In most cases, an instance of {@link DefaultCamelContext}.
- */
- public void setCamelContext(CamelContext camelCtx) {
- this.camelCtx = camelCtx;
- }
-
- /**
- * Gets the endpoint URI from which to consume.
- *
- * @return The endpoint URI.
- */
- public String getEndpointUri() {
- return endpointUri;
- }
-
- /**
- * Sets the endpoint URI from which to consume. <b>Mandatory.</b>
- *
- * @param endpointUri The endpoint URI.
- */
- public void setEndpointUri(String endpointUri) {
- this.endpointUri = endpointUri;
- }
-
- /**
- * Gets the {@link Processor} used to generate the response.
- *
- * @return The {@link Processor}.
- */
- public Processor getResponseProcessor() {
- return resProc;
- }
-
- /**
- * Sets the {@link Processor} used to generate the response.
- *
- * @param resProc The {@link Processor}.
- */
- public void setResponseProcessor(Processor resProc) {
- this.resProc = resProc;
- }
-}
diff --git a/modules/camel/src/main/java/org/apache/ignite/stream/camel/package-info.java b/modules/camel/src/main/java/org/apache/ignite/stream/camel/package-info.java
deleted file mode 100644
index a133a75..0000000
--- a/modules/camel/src/main/java/org/apache/ignite/stream/camel/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Contains implementation of Camel Streamer.
- */
-
-package org.apache.ignite.stream.camel;
diff --git a/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java b/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java
deleted file mode 100644
index 9858357..0000000
--- a/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java
+++ /dev/null
@@ -1,432 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream.camel;
-
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeSet;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
-import com.squareup.okhttp.MediaType;
-import com.squareup.okhttp.OkHttpClient;
-import com.squareup.okhttp.Request;
-import com.squareup.okhttp.RequestBody;
-import com.squareup.okhttp.Response;
-import org.apache.camel.CamelContext;
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.ServiceStatus;
-import org.apache.camel.component.properties.PropertiesComponent;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.support.LifecycleStrategySupport;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.cache.CachePeekMode;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.events.CacheEvent;
-import org.apache.ignite.events.EventType;
-import org.apache.ignite.internal.util.lang.GridMapEntry;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.lang.IgniteBiPredicate;
-import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.stream.StreamMultipleTupleExtractor;
-import org.apache.ignite.stream.StreamSingleTupleExtractor;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.junit.Test;
-
-import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
-
-/**
- * Test class for {@link CamelStreamer}.
- */
-public class IgniteCamelStreamerTest extends GridCommonAbstractTest {
- /** text/plain media type. */
- private static final MediaType TEXT_PLAIN = MediaType.parse("text/plain;charset=utf-8");
-
- /** The test data. */
- private static final Map<Integer, String> TEST_DATA = new HashMap<>();
-
- /** The Camel streamer currently under test. */
- private CamelStreamer<Integer, String> streamer;
-
- /** The Ignite data streamer. */
- private IgniteDataStreamer<Integer, String> dataStreamer;
-
- /** URL where the REST service will be exposed. */
- private String url;
-
- /** The UUID of the currently active remote listener. */
- private UUID remoteLsnr;
-
- /** The OkHttpClient. */
- private OkHttpClient httpClient = new OkHttpClient();
-
- // Initialize the test data.
- static {
- for (int i = 0; i < 100; i++)
- TEST_DATA.put(i, "v" + i);
- }
-
- /** Constructor. */
- public IgniteCamelStreamerTest() {
- super(true);
- }
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
- return super.getConfiguration(igniteInstanceName).setIncludeEventTypes(EventType.EVTS_ALL);
- }
-
- @SuppressWarnings("unchecked")
- @Override public void beforeTest() throws Exception {
- grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration());
-
- // find an available local port
- try (ServerSocket ss = new ServerSocket(0)) {
- int port = ss.getLocalPort();
-
- url = "http://localhost:" + port + "/ignite";
- }
-
- // create Camel streamer
- dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME);
- streamer = createCamelStreamer(dataStreamer);
- }
-
- @Override public void afterTest() throws Exception {
- try {
- streamer.stop();
- }
- catch (Exception ignored) {
- // ignore if already stopped
- }
-
- dataStreamer.close();
-
- grid().cache(DEFAULT_CACHE_NAME).clear();
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testSendOneEntryPerMessage() throws Exception {
- streamer.setSingleTupleExtractor(singleTupleExtractor());
-
- // Subscribe to cache PUT events.
- CountDownLatch latch = subscribeToPutEvents(50);
-
- // Action time.
- streamer.start();
-
- // Send messages.
- sendMessages(0, 50, false);
-
- // Assertions.
- assertTrue(latch.await(10, TimeUnit.SECONDS));
- assertCacheEntriesLoaded(50);
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testMultipleEntriesInOneMessage() throws Exception {
- streamer.setMultipleTupleExtractor(multipleTupleExtractor());
-
- // Subscribe to cache PUT events.
- CountDownLatch latch = subscribeToPutEvents(50);
-
- // Action time.
- streamer.start();
-
- // Send messages.
- sendMessages(0, 50, true);
-
- // Assertions.
- assertTrue(latch.await(10, TimeUnit.SECONDS));
- assertCacheEntriesLoaded(50);
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testResponseProcessorIsCalled() throws Exception {
- streamer.setSingleTupleExtractor(singleTupleExtractor());
- streamer.setResponseProcessor(new Processor() {
- @Override public void process(Exchange exchange) throws Exception {
- exchange.getOut().setBody("Foo bar");
- }
- });
-
- // Subscribe to cache PUT events.
- CountDownLatch latch = subscribeToPutEvents(50);
-
- // Action time.
- streamer.start();
-
- // Send messages.
- List<String> responses = sendMessages(0, 50, false);
-
- for (String r : responses)
- assertEquals("Foo bar", r);
-
- // Assertions.
- assertTrue(latch.await(10, TimeUnit.SECONDS));
- assertCacheEntriesLoaded(50);
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testUserSpecifiedCamelContext() throws Exception {
- final AtomicInteger cnt = new AtomicInteger();
-
- // Create a CamelContext with a probe that'll help us know if it has been used.
- CamelContext context = new DefaultCamelContext();
- context.setTracing(true);
- context.addLifecycleStrategy(new LifecycleStrategySupport() {
- @Override public void onEndpointAdd(Endpoint endpoint) {
- cnt.incrementAndGet();
- }
- });
-
- streamer.setSingleTupleExtractor(singleTupleExtractor());
- streamer.setCamelContext(context);
-
- // Subscribe to cache PUT events.
- CountDownLatch latch = subscribeToPutEvents(50);
-
- // Action time.
- streamer.start();
-
- // Send messages.
- sendMessages(0, 50, false);
-
- // Assertions.
- assertTrue(latch.await(10, TimeUnit.SECONDS));
- assertCacheEntriesLoaded(50);
- assertTrue(cnt.get() > 0);
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testUserSpecifiedCamelContextWithPropertyPlaceholders() throws Exception {
- // Create a CamelContext with a custom property placeholder.
- CamelContext context = new DefaultCamelContext();
-
- PropertiesComponent pc = new PropertiesComponent("camel.test.properties");
-
- context.addComponent("properties", pc);
-
- // Replace the context path in the test URL with the property placeholder.
- url = url.replaceAll("/ignite", "{{test.contextPath}}");
-
- // Recreate the Camel streamer with the new URL.
- streamer = createCamelStreamer(dataStreamer);
-
- streamer.setSingleTupleExtractor(singleTupleExtractor());
- streamer.setCamelContext(context);
-
- // Subscribe to cache PUT events.
- CountDownLatch latch = subscribeToPutEvents(50);
-
- // Action time.
- streamer.start();
-
- // Before sending the messages, get the actual URL after the property placeholder was resolved,
- // stripping the jetty: prefix from it.
- url = streamer.getCamelContext().getEndpoints().iterator().next().getEndpointUri().replaceAll("jetty:", "");
-
- // Send messages.
- sendMessages(0, 50, false);
-
- // Assertions.
- assertTrue(latch.await(10, TimeUnit.SECONDS));
- assertCacheEntriesLoaded(50);
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testInvalidEndpointUri() throws Exception {
- streamer.setSingleTupleExtractor(singleTupleExtractor());
- streamer.setEndpointUri("abc");
-
- // Action time.
- try {
- streamer.start();
- fail("Streamer started; should have failed.");
- }
- catch (IgniteException ignored) {
- assertTrue(streamer.getCamelContext().getStatus() == ServiceStatus.Stopped);
- assertTrue(streamer.getCamelContext().getEndpointRegistry().size() == 0);
- }
- }
-
- /**
- * Creates a Camel streamer.
- */
- private CamelStreamer<Integer, String> createCamelStreamer(IgniteDataStreamer<Integer, String> dataStreamer) {
- CamelStreamer<Integer, String> streamer = new CamelStreamer<>();
-
- streamer.setIgnite(grid());
- streamer.setStreamer(dataStreamer);
- streamer.setEndpointUri("jetty:" + url);
-
- dataStreamer.allowOverwrite(true);
- dataStreamer.autoFlushFrequency(1);
-
- return streamer;
- }
-
- /**
- * @throws IOException
- * @return HTTP response payloads.
- */
- private List<String> sendMessages(int fromIdx, int cnt, boolean singleMessage) throws IOException {
- List<String> responses = Lists.newArrayList();
-
- if (singleMessage) {
- StringBuilder sb = new StringBuilder();
-
- for (int i = fromIdx; i < fromIdx + cnt; i++)
- sb.append(i).append(",").append(TEST_DATA.get(i)).append("\n");
-
- Request request = new Request.Builder()
- .url(url)
- .post(RequestBody.create(TEXT_PLAIN, sb.toString()))
- .build();
-
- Response response = httpClient.newCall(request).execute();
-
- responses.add(response.body().string());
- }
- else {
- for (int i = fromIdx; i < fromIdx + cnt; i++) {
- String payload = i + "," + TEST_DATA.get(i);
-
- Request request = new Request.Builder()
- .url(url)
- .post(RequestBody.create(TEXT_PLAIN, payload))
- .build();
-
- Response response = httpClient.newCall(request).execute();
-
- responses.add(response.body().string());
- }
- }
-
- return responses;
- }
-
- /**
- * Returns a {@link StreamSingleTupleExtractor} for testing.
- */
- private static StreamSingleTupleExtractor<Exchange, Integer, String> singleTupleExtractor() {
- return new StreamSingleTupleExtractor<Exchange, Integer, String>() {
- @Override public Map.Entry<Integer, String> extract(Exchange exchange) {
- List<String> s = Splitter.on(",").splitToList(exchange.getIn().getBody(String.class));
-
- return new GridMapEntry<>(Integer.parseInt(s.get(0)), s.get(1));
- }
- };
- }
-
- /**
- * Returns a {@link StreamMultipleTupleExtractor} for testing.
- */
- private static StreamMultipleTupleExtractor<Exchange, Integer, String> multipleTupleExtractor() {
- return new StreamMultipleTupleExtractor<Exchange, Integer, String>() {
- @Override public Map<Integer, String> extract(Exchange exchange) {
- final Map<String, String> map = Splitter.on("\n")
- .omitEmptyStrings()
- .withKeyValueSeparator(",")
- .split(exchange.getIn().getBody(String.class));
-
- final Map<Integer, String> answer = new HashMap<>();
-
- F.forEach(map.keySet(), new IgniteInClosure<String>() {
- @Override public void apply(String s) {
- answer.put(Integer.parseInt(s), map.get(s));
- }
- });
-
- return answer;
- }
- };
- }
-
- /**
- * Subscribe to cache put events.
- */
- private CountDownLatch subscribeToPutEvents(int expect) {
- Ignite ignite = grid();
-
- // Listen to cache PUT events and expect as many as messages as test data items
- final CountDownLatch latch = new CountDownLatch(expect);
- @SuppressWarnings("serial") IgniteBiPredicate<UUID, CacheEvent> callback =
- new IgniteBiPredicate<UUID, CacheEvent>() {
- @Override public boolean apply(UUID uuid, CacheEvent evt) {
- latch.countDown();
-
- return true;
- }
- };
-
- remoteLsnr = ignite.events(ignite.cluster().forCacheNodes(DEFAULT_CACHE_NAME))
- .remoteListen(callback, null, EVT_CACHE_OBJECT_PUT);
-
- return latch;
- }
-
- /**
- * Assert a given number of cache entries have been loaded.
- */
- private void assertCacheEntriesLoaded(int cnt) {
- // get the cache and check that the entries are present
- IgniteCache<Integer, String> cache = grid().cache(DEFAULT_CACHE_NAME);
-
- // for each key from 0 to count from the TEST_DATA (ordered by key), check that the entry is present in cache
- for (Integer key : new ArrayList<>(new TreeSet<>(TEST_DATA.keySet())).subList(0, cnt))
- assertEquals(TEST_DATA.get(key), cache.get(key));
-
- // assert that the cache exactly the specified amount of elements
- assertEquals(cnt, cache.size(CachePeekMode.ALL));
-
- // remove the event listener
- grid().events(grid().cluster().forCacheNodes(DEFAULT_CACHE_NAME)).stopRemoteListen(remoteLsnr);
- }
-
-}
diff --git a/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTestSuite.java b/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTestSuite.java
deleted file mode 100644
index 7028a5c..0000000
--- a/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTestSuite.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream.camel;
-
-import org.junit.runner.RunWith;
-import org.junit.runners.Suite;
-
-/**
- * Camel streamer tests. Included into 'Streamers' run configuration.
- */
-@RunWith(Suite.class)
-@Suite.SuiteClasses({IgniteCamelStreamerTest.class})
-public class IgniteCamelStreamerTestSuite {
-}
diff --git a/modules/camel/src/test/java/org/apache/ignite/stream/camel/package-info.java b/modules/camel/src/test/java/org/apache/ignite/stream/camel/package-info.java
deleted file mode 100644
index a133a75..0000000
--- a/modules/camel/src/test/java/org/apache/ignite/stream/camel/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Contains implementation of Camel Streamer.
- */
-
-package org.apache.ignite.stream.camel;
diff --git a/modules/camel/src/test/resources/camel.test.properties b/modules/camel/src/test/resources/camel.test.properties
deleted file mode 100644
index 30459be..0000000
--- a/modules/camel/src/test/resources/camel.test.properties
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-test.contextPath = /ignite-properties
diff --git a/pom.xml b/pom.xml
index 0dfd354..f9fff5a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,7 +77,6 @@
<module>modules/jms11</module>
<module>modules/zookeeper</module>
<module>modules/hibernate-core</module>
- <module>modules/camel</module>
<module>modules/osgi-paxlogging</module>
<module>modules/osgi-karaf</module>
<module>modules/osgi</module>