You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dm...@apache.org on 2015/11/04 15:28:17 UTC

[37/37] ignite git commit: ignite-1790: minor fixes in order to align the code with the guidelines

ignite-1790: minor fixes in order to align the code with the guidelines


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a1d4e035
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a1d4e035
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a1d4e035

Branch: refs/heads/ignite-1790
Commit: a1d4e03577b804b350a9ecec9dd0f8bb8a8b2c0f
Parents: 7eb1b9d
Author: Denis Magda <dm...@gridgain.com>
Authored: Wed Nov 4 17:26:01 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Wed Nov 4 17:26:01 2015 +0300

----------------------------------------------------------------------
 .../ignite/stream/camel/CamelStreamer.java      | 91 ++++++++++----------
 .../stream/camel/IgniteCamelStreamerTest.java   | 11 ++-
 .../org/apache/ignite/stream/StreamAdapter.java | 19 ++--
 3 files changed, 59 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a1d4e035/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java
----------------------------------------------------------------------
diff --git a/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java b/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java
index 38b16f3..17bbadb 100644
--- a/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java
+++ b/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.stream.camel;
 
 import java.util.Map;
-
 import org.apache.camel.CamelContext;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
@@ -32,6 +31,7 @@ import org.apache.camel.util.ServiceHelper;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.stream.StreamAdapter;
 import org.apache.ignite.stream.StreamMultipleTupleExtractor;
 import org.apache.ignite.stream.StreamSingleTupleExtractor;
@@ -39,22 +39,21 @@ import org.apache.ignite.stream.StreamSingleTupleExtractor;
 /**
  * This streamer consumes messages from an Apache Camel consumer endpoint and feeds them into an Ignite data streamer.
  *
- * The only mandatory properties are {@link #endpointUri} and the appropriate stream tuple extractor
- * (either {@link StreamSingleTupleExtractor} or {@link StreamMultipleTupleExtractor)}.
+ * The only mandatory properties are {@link #endpointUri} and the appropriate stream tuple extractor (either {@link
+ * StreamSingleTupleExtractor} or {@link StreamMultipleTupleExtractor)}.
  *
- * The user can also provide a custom {@link CamelContext} in case they want to attach custom components, a
- * {@link org.apache.camel.component.properties.PropertiesComponent}, set tracers, management strategies, etc.
+ * The user can also provide a custom {@link CamelContext} in case they want to attach custom components, a {@link
+ * org.apache.camel.component.properties.PropertiesComponent}, set tracers, management strategies, etc.
  *
  * @see <a href="http://camel.apache.org">Apache Camel</a>
  * @see <a href="http://camel.apache.org/components.html">Apache Camel components</a>
  */
 public class CamelStreamer<K, V> extends StreamAdapter<Exchange, K, V> implements Processor {
-
     /** Logger. */
     private IgniteLogger log;
 
     /** The Camel Context. */
-    private CamelContext camelContext;
+    private CamelContext camelCtx;
 
     /** The endpoint URI to consume from. */
     private String endpointUri;
@@ -66,12 +65,12 @@ public class CamelStreamer<K, V> extends StreamAdapter<Exchange, K, V> implement
     private Consumer consumer;
 
     /** A {@link Processor} to generate the response. */
-    private Processor responseProcessor;
+    private Processor resProc;
 
     /**
      * Starts the streamer.
      *
-     * @throws IgniteException
+     * @throws IgniteException In cases when failed to start the streamer.
      */
     public void start() throws IgniteException {
         // Ensure that the endpoint URI is provided.
@@ -84,21 +83,23 @@ public class CamelStreamer<K, V> extends StreamAdapter<Exchange, K, V> implement
             "cannot provide both single and multiple tuple extractor");
 
         // If a custom CamelContext is not provided, initialize one.
-        if (camelContext == null)
-            camelContext = new DefaultCamelContext();
+        if (camelCtx == null)
+            camelCtx = new DefaultCamelContext();
 
         // If the Camel Context is starting or started, reject this call to start.
-        if (camelContext.getStatus() == ServiceStatus.Started || camelContext.getStatus() == ServiceStatus.Starting)
+        if (camelCtx.getStatus() == ServiceStatus.Started || camelCtx.getStatus() == ServiceStatus.Starting)
             throw new IgniteException("Failed to start Camel streamer (CamelContext already started or starting).");
 
         log = getIgnite().log();
 
         // Instantiate the Camel endpoint.
         try {
-            endpoint = CamelContextHelper.getMandatoryEndpoint(camelContext, endpointUri);
+            endpoint = CamelContextHelper.getMandatoryEndpoint(camelCtx, endpointUri);
         }
         catch (NoSuchEndpointException e) {
-            throw new IgniteException("Failed to start Camel streamer (exception while instantiating endpoint).", e);
+            U.error(log, e);
+
+            throw new IgniteException("Failed to start Camel streamer [errMsg=" + e.getMessage() + ']');
         }
 
         // Create the Camel consumer.
@@ -106,48 +107,51 @@ public class CamelStreamer<K, V> extends StreamAdapter<Exchange, K, V> implement
             consumer = endpoint.createConsumer(this);
         }
         catch (Exception e) {
-            throw new IgniteException("Failed to start Camel streamer (exception while creating consumer).", e);
+            U.error(log, e);
+
+            throw new IgniteException("Failed to start Camel streamer [errMsg=" + e.getMessage() + ']');
         }
 
         // Start the Camel services.
         try {
-            ServiceHelper.startServices(camelContext, endpoint, consumer);
+            ServiceHelper.startServices(camelCtx, endpoint, consumer);
         }
         catch (Exception e) {
-            throw new IgniteException("Failed to start Camel streamer (exception while starting services).", e);
+            U.error(log, e);
+
+            throw new IgniteException("Failed to start Camel streamer [errMsg=" + e.getMessage() + ']');
         }
 
-        log.info("Started Camel streamer consuming from endpoint URI: " + endpointUri);
+        U.log(log, "Started Camel streamer consuming from endpoint URI: " + endpointUri);
     }
 
     /**
      * Stops the streamer.
      *
-     * @throws IgniteException
+     * @throws IgniteException In cases if failed to stop the streamer.
      */
     public void stop() throws IgniteException {
         // If the Camel Context is stopping or stopped, reject this call to stop.
-        if (camelContext.getStatus() == ServiceStatus.Stopped || camelContext.getStatus() == ServiceStatus.Stopping)
+        if (camelCtx.getStatus() == ServiceStatus.Stopped || camelCtx.getStatus() == ServiceStatus.Stopping)
             throw new IgniteException("Failed to stop Camel streamer (CamelContext already stopped or stopping).");
 
         // Stop Camel services.
         try {
-            ServiceHelper.stopAndShutdownServices(camelContext, endpoint, consumer);
-        } catch (Exception e) {
-            throw new IgniteException("Failed to stop Camel streamer (exception while stopping services).", e);
+            ServiceHelper.stopAndShutdownServices(camelCtx, endpoint, consumer);
+        }
+        catch (Exception e) {
+            throw new IgniteException("Failed to stop Camel streamer [errMsg=" + e.getMessage() + ']');
         }
 
-        log.info("Stopped Camel streamer, formerly consuming from endpoint URI: " + endpointUri);
+        U.log(log, "Stopped Camel streamer, formerly consuming from endpoint URI: " + endpointUri);
     }
 
-
     /**
      * Processes the incoming {@link Exchange} and adds the tuple(s) to the underlying streamer.
      *
      * @param exchange The Camel Exchange.
      */
-    @Override
-    public void process(Exchange exchange) throws Exception {
+    @Override public void process(Exchange exchange) throws Exception {
         // Extract and insert the tuple(s).
         if (getMultipleTupleExtractor() == null) {
             Map.Entry<K, V> entry = getSingleTupleExtractor().extract(exchange);
@@ -159,35 +163,30 @@ public class CamelStreamer<K, V> extends StreamAdapter<Exchange, K, V> implement
         }
 
         // If the user has set a response processor, invoke it before finishing.
-        if (responseProcessor != null)
-            responseProcessor.process(exchange);
-
+        if (resProc != null)
+            resProc.process(exchange);
     }
 
-    // ----------------------------
-    //  Getters and setters
-    // ----------------------------
-
     /**
-     * Gets the underlying {@link CamelContext}, whether created automatically by Ignite or the context specified
-     * by the user.
+     * Gets the underlying {@link CamelContext}, whether created automatically by Ignite or the context specified by the
+     * user.
      *
      * @return The Camel Context.
      */
     public CamelContext getCamelContext() {
-        return camelContext;
+        return camelCtx;
     }
 
     /**
      * Explicitly sets the {@link CamelContext} to use.
      *
-     * Doing so gives the user the opportunity to attach custom components, a
-     * {@link org.apache.camel.component.properties.PropertiesComponent}, set tracers, management strategies, etc.
+     * Doing so gives the user the opportunity to attach custom components, a {@link
+     * org.apache.camel.component.properties.PropertiesComponent}, set tracers, management strategies, etc.
      *
-     * @param camelContext The Camel Context to use. In most cases, an instance of {@link DefaultCamelContext}.
+     * @param camelCtx The Camel Context to use. In most cases, an instance of {@link DefaultCamelContext}.
      */
-    public void setCamelContext(CamelContext camelContext) {
-        this.camelContext = camelContext;
+    public void setCamelContext(CamelContext camelCtx) {
+        this.camelCtx = camelCtx;
     }
 
     /**
@@ -214,15 +213,15 @@ public class CamelStreamer<K, V> extends StreamAdapter<Exchange, K, V> implement
      * @return The {@link Processor}.
      */
     public Processor getResponseProcessor() {
-        return responseProcessor;
+        return resProc;
     }
 
     /**
      * Sets the {@link Processor} used to generate the response.
      *
-     * @param responseProcessor The {@link Processor}.
+     * @param resProc The {@link Processor}.
      */
-    public void setResponseProcessor(Processor responseProcessor) {
-        this.responseProcessor = responseProcessor;
+    public void setResponseProcessor(Processor resProc) {
+        this.resProc = resProc;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1d4e035/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java b/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java
index 84d5154..545a1ec 100644
--- a/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java
+++ b/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java
@@ -65,7 +65,6 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
  * Test class for {@link CamelStreamer}.
  */
 public class IgniteCamelStreamerTest extends GridCommonAbstractTest {
-
     /** text/plain media type. */
     private static final MediaType TEXT_PLAIN = MediaType.parse("text/plain;charset=utf-8");
 
@@ -82,7 +81,7 @@ public class IgniteCamelStreamerTest extends GridCommonAbstractTest {
     private String url;
 
     /** The UUID of the currently active remote listener. */
-    private UUID remoteListener;
+    private UUID remoteLsnr;
 
     /** The OkHttpClient. */
     private OkHttpClient httpClient = new OkHttpClient();
@@ -99,7 +98,7 @@ public class IgniteCamelStreamerTest extends GridCommonAbstractTest {
     }
 
     @Before @SuppressWarnings("unchecked")
-    public void beforeTest() throws Exception {
+    @Override public void beforeTest() throws Exception {
         grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration());
 
         // find an available local port
@@ -115,7 +114,7 @@ public class IgniteCamelStreamerTest extends GridCommonAbstractTest {
     }
 
     @After
-    public void afterTest() throws Exception {
+    @Override public void afterTest() throws Exception {
         try {
             streamer.stop();
         }
@@ -377,7 +376,7 @@ public class IgniteCamelStreamerTest extends GridCommonAbstractTest {
             }
         };
 
-        remoteListener = ignite.events(ignite.cluster().forCacheNodes(null))
+        remoteLsnr = ignite.events(ignite.cluster().forCacheNodes(null))
             .remoteListen(callback, null, EVT_CACHE_OBJECT_PUT);
 
         return latch;
@@ -398,7 +397,7 @@ public class IgniteCamelStreamerTest extends GridCommonAbstractTest {
         assertEquals(cnt, cache.size(CachePeekMode.ALL));
 
         // remove the event listener
-        grid().events(grid().cluster().forCacheNodes(null)).stopRemoteListen(remoteListener);
+        grid().events(grid().cluster().forCacheNodes(null)).stopRemoteListen(remoteLsnr);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1d4e035/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
index e7d224c..efd1b18 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
@@ -37,7 +37,6 @@ import org.apache.ignite.IgniteDataStreamer;
  * </ol>
  */
 public abstract class StreamAdapter<T, K, V> {
-
     /** Tuple extractor extracting a single tuple from an event */
     private StreamSingleTupleExtractor<T, K, V> singleTupleExtractor;
 
@@ -99,9 +98,9 @@ public abstract class StreamAdapter<T, K, V> {
      */
     @Deprecated
     public StreamTupleExtractor<T, K, V> getTupleExtractor() {
-        if (singleTupleExtractor instanceof StreamTupleExtractor) {
+        if (singleTupleExtractor instanceof StreamTupleExtractor)
             return (StreamTupleExtractor) singleTupleExtractor;
-        }
+
         throw new IllegalArgumentException("This method is deprecated and only relevant if using an old " +
             "StreamTupleExtractor; use getSingleTupleExtractor instead");
     }
@@ -112,9 +111,9 @@ public abstract class StreamAdapter<T, K, V> {
      */
     @Deprecated
     public void setTupleExtractor(StreamTupleExtractor<T, K, V> extractor) {
-        if (multipleTupleExtractor != null) {
+        if (multipleTupleExtractor != null)
             throw new IllegalArgumentException("Multiple tuple extractor already set; cannot set both types at once.");
-        }
+
         this.singleTupleExtractor = extractor;
     }
 
@@ -129,9 +128,9 @@ public abstract class StreamAdapter<T, K, V> {
      * @param singleTupleExtractor Extractor for key-value tuples from messages.
      */
     public void setSingleTupleExtractor(StreamSingleTupleExtractor<T, K, V> singleTupleExtractor) {
-        if (multipleTupleExtractor != null) {
+        if (multipleTupleExtractor != null)
             throw new IllegalArgumentException("Multiple tuple extractor already set; cannot set both types at once.");
-        }
+
         this.singleTupleExtractor = singleTupleExtractor;
     }
 
@@ -146,9 +145,9 @@ public abstract class StreamAdapter<T, K, V> {
      * @param multipleTupleExtractor Extractor for 1:n tuple extraction.
      */
     public void setMultipleTupleExtractor(StreamMultipleTupleExtractor<T, K, V> multipleTupleExtractor) {
-        if (singleTupleExtractor != null) {
+        if (singleTupleExtractor != null)
             throw new IllegalArgumentException("Single tuple extractor already set; cannot set both types at once.");
-        }
+
         this.multipleTupleExtractor = multipleTupleExtractor;
     }
 
@@ -188,4 +187,4 @@ public abstract class StreamAdapter<T, K, V> {
         }
     }
 
-}
\ No newline at end of file
+}