You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/12/23 11:07:44 UTC

[2/2] camel git commit: CAMEL-8149: Fixed CS

CAMEL-8149: Fixed CS


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/16d35285
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/16d35285
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/16d35285

Branch: refs/heads/master
Commit: 16d352853c03f1fd7380c57917f3412e7b5be55f
Parents: 69de2e3
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Dec 23 11:05:47 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Dec 23 11:05:47 2014 +0100

----------------------------------------------------------------------
 .../elasticsearch/ElasticsearchProducer.java    | 209 +++++++++----------
 .../BulkRequestAggregationStrategy.java         |  60 +++---
 .../ElasticsearchActionRequestConverter.java    | 127 +++++------
 3 files changed, 183 insertions(+), 213 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/16d35285/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
index 5a25507..3d6e604 100644
--- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
+++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
@@ -34,126 +34,113 @@ import org.elasticsearch.client.Client;
  */
 public class ElasticsearchProducer extends DefaultProducer {
 
-	public ElasticsearchProducer(ElasticsearchEndpoint endpoint) {
-		super(endpoint);
-	}
+    public ElasticsearchProducer(ElasticsearchEndpoint endpoint) {
+        super(endpoint);
+    }
 
-	@Override
-	public ElasticsearchEndpoint getEndpoint() {
-		return (ElasticsearchEndpoint) super.getEndpoint();
-	}
+    @Override
+    public ElasticsearchEndpoint getEndpoint() {
+        return (ElasticsearchEndpoint) super.getEndpoint();
+    }
 
-	private String resolveOperation(Exchange exchange) {
-		// 1. Operation can be driven by either (in order of preference):
-		// a. If the body is an ActionRequest the operation is set by the type
-		// of request.
-		// b. If the body is not an ActionRequest, the operation is set by the
-		// header if it exists.
-		// c. If neither the operation can not be derived from the body or
-		// header, the configuration is used.
-		// In the event we can't discover the operation from a, b or c we throw
-		// an error.
+    private String resolveOperation(Exchange exchange) {
+        // 1. Operation can be driven by either (in order of preference):
+        // a. If the body is an ActionRequest the operation is set by the type
+        // of request.
+        // b. If the body is not an ActionRequest, the operation is set by the
+        // header if it exists.
+        // c. If neither the operation can not be derived from the body or
+        // header, the configuration is used.
+        // In the event we can't discover the operation from a, b or c we throw
+        // an error.
 
-		Object request = exchange.getIn().getBody();
-		if (request instanceof IndexRequest)
-			return ElasticsearchConfiguration.OPERATION_INDEX;
-		else if (request instanceof GetRequest)
-			return ElasticsearchConfiguration.OPERATION_GET_BY_ID;
-		else if (request instanceof BulkRequest)
-			return ElasticsearchConfiguration.OPERATION_BULK_INDEX;
-		else if (request instanceof DeleteRequest)
-			return ElasticsearchConfiguration.OPERATION_DELETE;
+        Object request = exchange.getIn().getBody();
+        if (request instanceof IndexRequest) {
+            return ElasticsearchConfiguration.OPERATION_INDEX;
+        } else if (request instanceof GetRequest) {
+            return ElasticsearchConfiguration.OPERATION_GET_BY_ID;
+        } else if (request instanceof BulkRequest) {
+            return ElasticsearchConfiguration.OPERATION_BULK_INDEX;
+        } else if (request instanceof DeleteRequest) {
+            return ElasticsearchConfiguration.OPERATION_DELETE;
+        }
 
-		String operationConfig = exchange.getIn().getHeader(
-				ElasticsearchConfiguration.PARAM_OPERATION, String.class);
-		if (operationConfig == null) {
-			operationConfig = getEndpoint().getConfig().getOperation();
-		}
-		if (operationConfig == null)
-			throw new IllegalArgumentException(
-					ElasticsearchConfiguration.PARAM_OPERATION + " value '"
-							+ operationConfig + "' is not supported");
-		return operationConfig;
-	}
+        String operationConfig = exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_OPERATION, String.class);
+        if (operationConfig == null) {
+            operationConfig = getEndpoint().getConfig().getOperation();
+        }
+        if (operationConfig == null) {
+            throw new IllegalArgumentException(ElasticsearchConfiguration.PARAM_OPERATION + " value '" + operationConfig + "' is not supported");
+        }
+        return operationConfig;
+    }
 
-	public void process(Exchange exchange) throws Exception {
-		// 2. Index and type will be set by:
-		// a. If the incoming body is already an action request
-		// b. If the body is not an action request we will use headers if they
-		// are set.
-		// c. If the body is not an action request and the headers aren't set we
-		// will use the configuration.
-		// No error is thrown by the component in the event none of the above
-		// conditions are met. The java es client
-		// will throw.
+    public void process(Exchange exchange) throws Exception {
+        // 2. Index and type will be set by:
+        // a. If the incoming body is already an action request
+        // b. If the body is not an action request we will use headers if they
+        // are set.
+        // c. If the body is not an action request and the headers aren't set we
+        // will use the configuration.
+        // No error is thrown by the component in the event none of the above
+        // conditions are met. The java es client
+        // will throw.
 
-		Message message = exchange.getIn();
-		ElasticsearchConfiguration config = getEndpoint().getConfig();
-		final String operation = resolveOperation(exchange);
+        Message message = exchange.getIn();
+        final String operation = resolveOperation(exchange);
 
-		// Set the index/type headers on the exchange if necessary. This is used
-		// for type conversion.
-		boolean configIndexName = false;
-		String indexName = message.getHeader(
-				ElasticsearchConfiguration.PARAM_INDEX_NAME, String.class);
-		if (indexName == null) {
-			message.setHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME,
-					getEndpoint().getConfig().getIndexName());
-			configIndexName = true;
-		}
+        // Set the index/type headers on the exchange if necessary. This is used
+        // for type conversion.
+        boolean configIndexName = false;
+        String indexName = message.getHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME, String.class);
+        if (indexName == null) {
+            message.setHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME, getEndpoint().getConfig().getIndexName());
+            configIndexName = true;
+        }
 
-		boolean configIndexType = false;
-		String indexType = message.getHeader(
-				ElasticsearchConfiguration.PARAM_INDEX_TYPE, String.class);
-		if (indexType == null) {
-			message.setHeader(ElasticsearchConfiguration.PARAM_INDEX_TYPE,
-					getEndpoint().getConfig().getIndexName());
-			configIndexType = true;
-		}
+        boolean configIndexType = false;
+        String indexType = message.getHeader(ElasticsearchConfiguration.PARAM_INDEX_TYPE, String.class);
+        if (indexType == null) {
+            message.setHeader(ElasticsearchConfiguration.PARAM_INDEX_TYPE, getEndpoint().getConfig().getIndexName());
+            configIndexType = true;
+        }
 
-		Client client = getEndpoint().getClient();
-		if (ElasticsearchConfiguration.OPERATION_INDEX.equals(operation)) {
-			IndexRequest indexRequest = message.getBody(IndexRequest.class);
-			message.setBody(client.index(indexRequest).actionGet().getId());
-		} else if (ElasticsearchConfiguration.OPERATION_GET_BY_ID
-				.equals(operation)) {
-			GetRequest getRequest = message.getBody(GetRequest.class);
-			message.setBody(client.get(getRequest));
-		} else if (ElasticsearchConfiguration.OPERATION_BULK_INDEX
-				.equals(operation)) {
-			BulkRequest bulkRequest = message.getBody(BulkRequest.class);
-			List<String> indexedIds = new LinkedList<String>();
-			for (BulkItemResponse response : client.bulk(bulkRequest)
-					.actionGet().getItems()) {
-				indexedIds.add(response.getId());
-			}
-			log.debug("List of successfully indexed document ids : {}",
-					indexedIds);
-			message.setBody(indexedIds);
-		} else if (ElasticsearchConfiguration.OPERATION_DELETE
-				.equals(operation)) {
-			DeleteRequest deleteRequest = message.getBody(DeleteRequest.class);
-			message.setBody(client.delete(deleteRequest).actionGet());
-		} else {
-			throw new IllegalArgumentException(
-					ElasticsearchConfiguration.PARAM_OPERATION + " value '"
-							+ operation + "' is not supported");
-		}
+        Client client = getEndpoint().getClient();
+        if (ElasticsearchConfiguration.OPERATION_INDEX.equals(operation)) {
+            IndexRequest indexRequest = message.getBody(IndexRequest.class);
+            message.setBody(client.index(indexRequest).actionGet().getId());
+        } else if (ElasticsearchConfiguration.OPERATION_GET_BY_ID.equals(operation)) {
+            GetRequest getRequest = message.getBody(GetRequest.class);
+            message.setBody(client.get(getRequest));
+        } else if (ElasticsearchConfiguration.OPERATION_BULK_INDEX.equals(operation)) {
+            BulkRequest bulkRequest = message.getBody(BulkRequest.class);
+            List<String> indexedIds = new LinkedList<String>();
+            for (BulkItemResponse response : client.bulk(bulkRequest).actionGet().getItems()) {
+                indexedIds.add(response.getId());
+            }
+            log.debug("List of successfully indexed document ids : {}", indexedIds);
+            message.setBody(indexedIds);
+        } else if (ElasticsearchConfiguration.OPERATION_DELETE.equals(operation)) {
+            DeleteRequest deleteRequest = message.getBody(DeleteRequest.class);
+            message.setBody(client.delete(deleteRequest).actionGet());
+        } else {
+            throw new IllegalArgumentException(ElasticsearchConfiguration.PARAM_OPERATION + " value '" + operation + "' is not supported");
+        }
 
-		// If we set params via the configuration on this exchange, remove them
-		// now. This preserves legacy behavior for this component and enables a
-		// use case where one message can be sent to multiple elasticsearch
-		// endpoints where the user is relying on the endpoint configuration
-		// (index/type) rather than header values. If we do not clear this out
-		// sending the same message (index request, for example) to multiple
-		// elasticsearch endpoints would have the effect overriding any
-		// subsequent endpoint index/type with the first endpoint index/type.
-		if (configIndexName) {
-			message.removeHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME);
-		}
+        // If we set params via the configuration on this exchange, remove them
+        // now. This preserves legacy behavior for this component and enables a
+        // use case where one message can be sent to multiple elasticsearch
+        // endpoints where the user is relying on the endpoint configuration
+        // (index/type) rather than header values. If we do not clear this out
+        // sending the same message (index request, for example) to multiple
+        // elasticsearch endpoints would have the effect overriding any
+        // subsequent endpoint index/type with the first endpoint index/type.
+        if (configIndexName) {
+            message.removeHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME);
+        }
 
-		if (configIndexType) {
-			message.removeHeader(ElasticsearchConfiguration.PARAM_INDEX_TYPE);
-		}
-	}
+        if (configIndexType) {
+            message.removeHeader(ElasticsearchConfiguration.PARAM_INDEX_TYPE);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/16d35285/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/aggregation/BulkRequestAggregationStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/aggregation/BulkRequestAggregationStrategy.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/aggregation/BulkRequestAggregationStrategy.java
index f32fb0a..7b938a0 100644
--- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/aggregation/BulkRequestAggregationStrategy.java
+++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/aggregation/BulkRequestAggregationStrategy.java
@@ -1,11 +1,13 @@
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,7 +17,7 @@
 package org.apache.camel.component.elasticsearch.aggregation;
 
 import org.apache.camel.Exchange;
-import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.InvalidPayloadRuntimeException;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.bulk.BulkRequest;
@@ -23,35 +25,27 @@ import org.elasticsearch.action.bulk.BulkRequest;
 /**
  * Aggregates two {@link ActionRequest}s into a single {@link BulkRequest}.
  */
-public class BulkRequestAggregationStrategy implements AggregationStrategy
-{
-   /////////////////////////////////////////////////////////////////////////
-   // Public methods
-   /////////////////////////////////////////////////////////////////////////
+public class BulkRequestAggregationStrategy implements AggregationStrategy {
 
-   @Override
-   public Exchange aggregate(Exchange oldExchange, Exchange newExchange)
-   {
-      // Don't use getBody(Class<T>) here as we don't want to coerce the body type using a type converter.
-      Object objBody = newExchange.getIn().getBody();
-      if (!(objBody instanceof ActionRequest))
-         throw new RuntimeCamelException("Invalid body type for elastisearch bulk request aggregation strategy: " +
-            objBody.getClass().getName());
+    @Override
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        // Don't use getBody(Class<T>) here as we don't want to coerce the body type using a type converter.
+        Object objBody = newExchange.getIn().getBody();
+        if (!(objBody instanceof ActionRequest)) {
+            throw new InvalidPayloadRuntimeException(newExchange, ActionRequest.class);
+        }
 
-      ActionRequest newBody = (ActionRequest)objBody;
-      BulkRequest request = null;
-      if (oldExchange == null)
-      {
-         request = new BulkRequest();
-         request.add(newBody);
-         newExchange.getIn().setBody(request);
-         return newExchange;
-      }
-      else
-      {
-         request = oldExchange.getIn().getBody(BulkRequest.class);
-         request.add(newBody);
-         return oldExchange;
-      }
-   }
+        ActionRequest newBody = (ActionRequest) objBody;
+        BulkRequest request;
+        if (oldExchange == null) {
+            request = new BulkRequest();
+            request.add(newBody);
+            newExchange.getIn().setBody(request);
+            return newExchange;
+        } else {
+            request = oldExchange.getIn().getBody(BulkRequest.class);
+            request.add(newBody);
+            return oldExchange;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/16d35285/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java
index 2ad0d92..a62f218 100644
--- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java
+++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.elasticsearch.converter;
 
 import java.util.List;
 import java.util.Map;
+
 import org.apache.camel.Converter;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.elasticsearch.ElasticsearchConfiguration;
@@ -28,80 +29,68 @@ import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 
 @Converter
-public class ElasticsearchActionRequestConverter {
-
-	// Index requests
-	private static IndexRequest createIndexRequest(Object document,
-			Exchange exchange) {
-		IndexRequest indexRequest = new IndexRequest();
-		if (document instanceof byte[]) {
-			indexRequest.source((byte[]) document);
-		} else if (document instanceof Map) {
-			indexRequest.source((Map<String, Object>) document);
-		} else if (document instanceof String) {
-			indexRequest.source((String) document);
-		} else if (document instanceof XContentBuilder) {
-			indexRequest.source((XContentBuilder) document);
-		} else {
-			return null;
-		}
-
-		return indexRequest.index(
-				exchange.getIn().getHeader(
-						ElasticsearchConfiguration.PARAM_INDEX_NAME,
-						String.class)).type(
-				exchange.getIn().getHeader(
-						ElasticsearchConfiguration.PARAM_INDEX_TYPE,
-						String.class));
-	}
-
-	@Converter
-	public static IndexRequest toIndexRequest(Object document, Exchange exchange) {
-		if (document == null)
-			return null;
+public final class ElasticsearchActionRequestConverter {
 
-		return createIndexRequest(document, exchange).id(
-				exchange.getIn()
-						.getHeader(ElasticsearchConfiguration.PARAM_INDEX_ID,
-								String.class));
-	}
+    private ElasticsearchActionRequestConverter() {
+    }
 
-	@Converter
-	public static GetRequest toGetRequest(String id, Exchange exchange) {
-		if (id == null)
-			return null;
+    // Index requests
+    private static IndexRequest createIndexRequest(Object document, Exchange exchange) {
+        IndexRequest indexRequest = new IndexRequest();
+        if (document instanceof byte[]) {
+            indexRequest.source((byte[]) document);
+        } else if (document instanceof Map) {
+            indexRequest.source((Map<String, Object>) document);
+        } else if (document instanceof String) {
+            indexRequest.source((String) document);
+        } else if (document instanceof XContentBuilder) {
+            indexRequest.source((XContentBuilder) document);
+        } else {
+            return null;
+        }
 
-		return new GetRequest(exchange.getIn().getHeader(
-				ElasticsearchConfiguration.PARAM_INDEX_NAME, String.class))
-				.type(exchange.getIn().getHeader(
-						ElasticsearchConfiguration.PARAM_INDEX_TYPE,
-						String.class)).id(id);
-	}
+        return indexRequest.index(
+                exchange.getIn().getHeader(
+                        ElasticsearchConfiguration.PARAM_INDEX_NAME,
+                        String.class)).type(
+                exchange.getIn().getHeader(
+                        ElasticsearchConfiguration.PARAM_INDEX_TYPE,
+                        String.class));
+    }
 
-	@Converter
-	public static DeleteRequest toDeleteRequest(String id, Exchange exchange) {
-		if (id == null)
-			return null;
+    @Converter
+    public static IndexRequest toIndexRequest(Object document, Exchange exchange) {
+        return createIndexRequest(document, exchange)
+                .id(exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_INDEX_ID, String.class));
+    }
 
-		return new DeleteRequest()
-				.index(exchange.getIn().getHeader(
-						ElasticsearchConfiguration.PARAM_INDEX_NAME,
-						String.class))
-				.type(exchange.getIn().getHeader(
-						ElasticsearchConfiguration.PARAM_INDEX_TYPE,
-						String.class)).id(id);
-	}
+    @Converter
+    public static GetRequest toGetRequest(String id, Exchange exchange) {
+        return new GetRequest(exchange.getIn().getHeader(
+                ElasticsearchConfiguration.PARAM_INDEX_NAME, String.class))
+                .type(exchange.getIn().getHeader(
+                        ElasticsearchConfiguration.PARAM_INDEX_TYPE,
+                        String.class)).id(id);
+    }
 
-	@Converter
-	public static BulkRequest toBulkRequest(List<Object> documents,
-			Exchange exchange) {
-		if (documents == null)
-			return null;
+    @Converter
+    public static DeleteRequest toDeleteRequest(String id, Exchange exchange) {
+        return new DeleteRequest()
+                .index(exchange.getIn().getHeader(
+                        ElasticsearchConfiguration.PARAM_INDEX_NAME,
+                        String.class))
+                .type(exchange.getIn().getHeader(
+                        ElasticsearchConfiguration.PARAM_INDEX_TYPE,
+                        String.class)).id(id);
+    }
 
-		BulkRequest request = new BulkRequest();
-		for (Object document : documents) {
-			request.add(createIndexRequest(document, exchange));
-		}
-		return request;
-	}
+    @Converter
+    public static BulkRequest toBulkRequest(List<Object> documents,
+                                            Exchange exchange) {
+        BulkRequest request = new BulkRequest();
+        for (Object document : documents) {
+            request.add(createIndexRequest(document, exchange));
+        }
+        return request;
+    }
 }