You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dm...@apache.org on 2015/11/04 15:28:17 UTC
[37/37] ignite git commit: ignite-1790: minor fixes in order to align
the code with the guidelines
ignite-1790: minor fixes in order to align the code with the guidelines
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a1d4e035
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a1d4e035
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a1d4e035
Branch: refs/heads/ignite-1790
Commit: a1d4e03577b804b350a9ecec9dd0f8bb8a8b2c0f
Parents: 7eb1b9d
Author: Denis Magda <dm...@gridgain.com>
Authored: Wed Nov 4 17:26:01 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Wed Nov 4 17:26:01 2015 +0300
----------------------------------------------------------------------
.../ignite/stream/camel/CamelStreamer.java | 91 ++++++++++----------
.../stream/camel/IgniteCamelStreamerTest.java | 11 ++-
.../org/apache/ignite/stream/StreamAdapter.java | 19 ++--
3 files changed, 59 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a1d4e035/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java
----------------------------------------------------------------------
diff --git a/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java b/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java
index 38b16f3..17bbadb 100644
--- a/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java
+++ b/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java
@@ -18,7 +18,6 @@
package org.apache.ignite.stream.camel;
import java.util.Map;
-
import org.apache.camel.CamelContext;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
@@ -32,6 +31,7 @@ import org.apache.camel.util.ServiceHelper;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.stream.StreamAdapter;
import org.apache.ignite.stream.StreamMultipleTupleExtractor;
import org.apache.ignite.stream.StreamSingleTupleExtractor;
@@ -39,22 +39,21 @@ import org.apache.ignite.stream.StreamSingleTupleExtractor;
/**
* This streamer consumes messages from an Apache Camel consumer endpoint and feeds them into an Ignite data streamer.
*
- * The only mandatory properties are {@link #endpointUri} and the appropriate stream tuple extractor
- * (either {@link StreamSingleTupleExtractor} or {@link StreamMultipleTupleExtractor)}.
+ * The only mandatory properties are {@link #endpointUri} and the appropriate stream tuple extractor (either {@link
+ * StreamSingleTupleExtractor} or {@link StreamMultipleTupleExtractor)}.
*
- * The user can also provide a custom {@link CamelContext} in case they want to attach custom components, a
- * {@link org.apache.camel.component.properties.PropertiesComponent}, set tracers, management strategies, etc.
+ * The user can also provide a custom {@link CamelContext} in case they want to attach custom components, a {@link
+ * org.apache.camel.component.properties.PropertiesComponent}, set tracers, management strategies, etc.
*
* @see <a href="http://camel.apache.org">Apache Camel</a>
* @see <a href="http://camel.apache.org/components.html">Apache Camel components</a>
*/
public class CamelStreamer<K, V> extends StreamAdapter<Exchange, K, V> implements Processor {
-
/** Logger. */
private IgniteLogger log;
/** The Camel Context. */
- private CamelContext camelContext;
+ private CamelContext camelCtx;
/** The endpoint URI to consume from. */
private String endpointUri;
@@ -66,12 +65,12 @@ public class CamelStreamer<K, V> extends StreamAdapter<Exchange, K, V> implement
private Consumer consumer;
/** A {@link Processor} to generate the response. */
- private Processor responseProcessor;
+ private Processor resProc;
/**
* Starts the streamer.
*
- * @throws IgniteException
+ * @throws IgniteException In cases when failed to start the streamer.
*/
public void start() throws IgniteException {
// Ensure that the endpoint URI is provided.
@@ -84,21 +83,23 @@ public class CamelStreamer<K, V> extends StreamAdapter<Exchange, K, V> implement
"cannot provide both single and multiple tuple extractor");
// If a custom CamelContext is not provided, initialize one.
- if (camelContext == null)
- camelContext = new DefaultCamelContext();
+ if (camelCtx == null)
+ camelCtx = new DefaultCamelContext();
// If the Camel Context is starting or started, reject this call to start.
- if (camelContext.getStatus() == ServiceStatus.Started || camelContext.getStatus() == ServiceStatus.Starting)
+ if (camelCtx.getStatus() == ServiceStatus.Started || camelCtx.getStatus() == ServiceStatus.Starting)
throw new IgniteException("Failed to start Camel streamer (CamelContext already started or starting).");
log = getIgnite().log();
// Instantiate the Camel endpoint.
try {
- endpoint = CamelContextHelper.getMandatoryEndpoint(camelContext, endpointUri);
+ endpoint = CamelContextHelper.getMandatoryEndpoint(camelCtx, endpointUri);
}
catch (NoSuchEndpointException e) {
- throw new IgniteException("Failed to start Camel streamer (exception while instantiating endpoint).", e);
+ U.error(log, e);
+
+ throw new IgniteException("Failed to start Camel streamer [errMsg=" + e.getMessage() + ']');
}
// Create the Camel consumer.
@@ -106,48 +107,51 @@ public class CamelStreamer<K, V> extends StreamAdapter<Exchange, K, V> implement
consumer = endpoint.createConsumer(this);
}
catch (Exception e) {
- throw new IgniteException("Failed to start Camel streamer (exception while creating consumer).", e);
+ U.error(log, e);
+
+ throw new IgniteException("Failed to start Camel streamer [errMsg=" + e.getMessage() + ']');
}
// Start the Camel services.
try {
- ServiceHelper.startServices(camelContext, endpoint, consumer);
+ ServiceHelper.startServices(camelCtx, endpoint, consumer);
}
catch (Exception e) {
- throw new IgniteException("Failed to start Camel streamer (exception while starting services).", e);
+ U.error(log, e);
+
+ throw new IgniteException("Failed to start Camel streamer [errMsg=" + e.getMessage() + ']');
}
- log.info("Started Camel streamer consuming from endpoint URI: " + endpointUri);
+ U.log(log, "Started Camel streamer consuming from endpoint URI: " + endpointUri);
}
/**
* Stops the streamer.
*
- * @throws IgniteException
+ * @throws IgniteException In cases if failed to stop the streamer.
*/
public void stop() throws IgniteException {
// If the Camel Context is stopping or stopped, reject this call to stop.
- if (camelContext.getStatus() == ServiceStatus.Stopped || camelContext.getStatus() == ServiceStatus.Stopping)
+ if (camelCtx.getStatus() == ServiceStatus.Stopped || camelCtx.getStatus() == ServiceStatus.Stopping)
throw new IgniteException("Failed to stop Camel streamer (CamelContext already stopped or stopping).");
// Stop Camel services.
try {
- ServiceHelper.stopAndShutdownServices(camelContext, endpoint, consumer);
- } catch (Exception e) {
- throw new IgniteException("Failed to stop Camel streamer (exception while stopping services).", e);
+ ServiceHelper.stopAndShutdownServices(camelCtx, endpoint, consumer);
+ }
+ catch (Exception e) {
+ throw new IgniteException("Failed to stop Camel streamer [errMsg=" + e.getMessage() + ']');
}
- log.info("Stopped Camel streamer, formerly consuming from endpoint URI: " + endpointUri);
+ U.log(log, "Stopped Camel streamer, formerly consuming from endpoint URI: " + endpointUri);
}
-
/**
* Processes the incoming {@link Exchange} and adds the tuple(s) to the underlying streamer.
*
* @param exchange The Camel Exchange.
*/
- @Override
- public void process(Exchange exchange) throws Exception {
+ @Override public void process(Exchange exchange) throws Exception {
// Extract and insert the tuple(s).
if (getMultipleTupleExtractor() == null) {
Map.Entry<K, V> entry = getSingleTupleExtractor().extract(exchange);
@@ -159,35 +163,30 @@ public class CamelStreamer<K, V> extends StreamAdapter<Exchange, K, V> implement
}
// If the user has set a response processor, invoke it before finishing.
- if (responseProcessor != null)
- responseProcessor.process(exchange);
-
+ if (resProc != null)
+ resProc.process(exchange);
}
- // ----------------------------
- // Getters and setters
- // ----------------------------
-
/**
- * Gets the underlying {@link CamelContext}, whether created automatically by Ignite or the context specified
- * by the user.
+ * Gets the underlying {@link CamelContext}, whether created automatically by Ignite or the context specified by the
+ * user.
*
* @return The Camel Context.
*/
public CamelContext getCamelContext() {
- return camelContext;
+ return camelCtx;
}
/**
* Explicitly sets the {@link CamelContext} to use.
*
- * Doing so gives the user the opportunity to attach custom components, a
- * {@link org.apache.camel.component.properties.PropertiesComponent}, set tracers, management strategies, etc.
+ * Doing so gives the user the opportunity to attach custom components, a {@link
+ * org.apache.camel.component.properties.PropertiesComponent}, set tracers, management strategies, etc.
*
- * @param camelContext The Camel Context to use. In most cases, an instance of {@link DefaultCamelContext}.
+ * @param camelCtx The Camel Context to use. In most cases, an instance of {@link DefaultCamelContext}.
*/
- public void setCamelContext(CamelContext camelContext) {
- this.camelContext = camelContext;
+ public void setCamelContext(CamelContext camelCtx) {
+ this.camelCtx = camelCtx;
}
/**
@@ -214,15 +213,15 @@ public class CamelStreamer<K, V> extends StreamAdapter<Exchange, K, V> implement
* @return The {@link Processor}.
*/
public Processor getResponseProcessor() {
- return responseProcessor;
+ return resProc;
}
/**
* Sets the {@link Processor} used to generate the response.
*
- * @param responseProcessor The {@link Processor}.
+ * @param resProc The {@link Processor}.
*/
- public void setResponseProcessor(Processor responseProcessor) {
- this.responseProcessor = responseProcessor;
+ public void setResponseProcessor(Processor resProc) {
+ this.resProc = resProc;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a1d4e035/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java b/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java
index 84d5154..545a1ec 100644
--- a/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java
+++ b/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java
@@ -65,7 +65,6 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
* Test class for {@link CamelStreamer}.
*/
public class IgniteCamelStreamerTest extends GridCommonAbstractTest {
-
/** text/plain media type. */
private static final MediaType TEXT_PLAIN = MediaType.parse("text/plain;charset=utf-8");
@@ -82,7 +81,7 @@ public class IgniteCamelStreamerTest extends GridCommonAbstractTest {
private String url;
/** The UUID of the currently active remote listener. */
- private UUID remoteListener;
+ private UUID remoteLsnr;
/** The OkHttpClient. */
private OkHttpClient httpClient = new OkHttpClient();
@@ -99,7 +98,7 @@ public class IgniteCamelStreamerTest extends GridCommonAbstractTest {
}
@Before @SuppressWarnings("unchecked")
- public void beforeTest() throws Exception {
+ @Override public void beforeTest() throws Exception {
grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration());
// find an available local port
@@ -115,7 +114,7 @@ public class IgniteCamelStreamerTest extends GridCommonAbstractTest {
}
@After
- public void afterTest() throws Exception {
+ @Override public void afterTest() throws Exception {
try {
streamer.stop();
}
@@ -377,7 +376,7 @@ public class IgniteCamelStreamerTest extends GridCommonAbstractTest {
}
};
- remoteListener = ignite.events(ignite.cluster().forCacheNodes(null))
+ remoteLsnr = ignite.events(ignite.cluster().forCacheNodes(null))
.remoteListen(callback, null, EVT_CACHE_OBJECT_PUT);
return latch;
@@ -398,7 +397,7 @@ public class IgniteCamelStreamerTest extends GridCommonAbstractTest {
assertEquals(cnt, cache.size(CachePeekMode.ALL));
// remove the event listener
- grid().events(grid().cluster().forCacheNodes(null)).stopRemoteListen(remoteListener);
+ grid().events(grid().cluster().forCacheNodes(null)).stopRemoteListen(remoteLsnr);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a1d4e035/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
index e7d224c..efd1b18 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
@@ -37,7 +37,6 @@ import org.apache.ignite.IgniteDataStreamer;
* </ol>
*/
public abstract class StreamAdapter<T, K, V> {
-
/** Tuple extractor extracting a single tuple from an event */
private StreamSingleTupleExtractor<T, K, V> singleTupleExtractor;
@@ -99,9 +98,9 @@ public abstract class StreamAdapter<T, K, V> {
*/
@Deprecated
public StreamTupleExtractor<T, K, V> getTupleExtractor() {
- if (singleTupleExtractor instanceof StreamTupleExtractor) {
+ if (singleTupleExtractor instanceof StreamTupleExtractor)
return (StreamTupleExtractor) singleTupleExtractor;
- }
+
throw new IllegalArgumentException("This method is deprecated and only relevant if using an old " +
"StreamTupleExtractor; use getSingleTupleExtractor instead");
}
@@ -112,9 +111,9 @@ public abstract class StreamAdapter<T, K, V> {
*/
@Deprecated
public void setTupleExtractor(StreamTupleExtractor<T, K, V> extractor) {
- if (multipleTupleExtractor != null) {
+ if (multipleTupleExtractor != null)
throw new IllegalArgumentException("Multiple tuple extractor already set; cannot set both types at once.");
- }
+
this.singleTupleExtractor = extractor;
}
@@ -129,9 +128,9 @@ public abstract class StreamAdapter<T, K, V> {
* @param singleTupleExtractor Extractor for key-value tuples from messages.
*/
public void setSingleTupleExtractor(StreamSingleTupleExtractor<T, K, V> singleTupleExtractor) {
- if (multipleTupleExtractor != null) {
+ if (multipleTupleExtractor != null)
throw new IllegalArgumentException("Multiple tuple extractor already set; cannot set both types at once.");
- }
+
this.singleTupleExtractor = singleTupleExtractor;
}
@@ -146,9 +145,9 @@ public abstract class StreamAdapter<T, K, V> {
* @param multipleTupleExtractor Extractor for 1:n tuple extraction.
*/
public void setMultipleTupleExtractor(StreamMultipleTupleExtractor<T, K, V> multipleTupleExtractor) {
- if (singleTupleExtractor != null) {
+ if (singleTupleExtractor != null)
throw new IllegalArgumentException("Single tuple extractor already set; cannot set both types at once.");
- }
+
this.multipleTupleExtractor = multipleTupleExtractor;
}
@@ -188,4 +187,4 @@ public abstract class StreamAdapter<T, K, V> {
}
}
-}
\ No newline at end of file
+}