You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/12/23 11:07:44 UTC
[2/2] camel git commit: CAMEL-8149: Fixed CS
CAMEL-8149: Fixed CS
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/16d35285
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/16d35285
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/16d35285
Branch: refs/heads/master
Commit: 16d352853c03f1fd7380c57917f3412e7b5be55f
Parents: 69de2e3
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Dec 23 11:05:47 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Dec 23 11:05:47 2014 +0100
----------------------------------------------------------------------
.../elasticsearch/ElasticsearchProducer.java | 209 +++++++++----------
.../BulkRequestAggregationStrategy.java | 60 +++---
.../ElasticsearchActionRequestConverter.java | 127 +++++------
3 files changed, 183 insertions(+), 213 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/16d35285/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
index 5a25507..3d6e604 100644
--- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
+++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
@@ -34,126 +34,113 @@ import org.elasticsearch.client.Client;
*/
public class ElasticsearchProducer extends DefaultProducer {
- public ElasticsearchProducer(ElasticsearchEndpoint endpoint) {
- super(endpoint);
- }
+ public ElasticsearchProducer(ElasticsearchEndpoint endpoint) {
+ super(endpoint);
+ }
- @Override
- public ElasticsearchEndpoint getEndpoint() {
- return (ElasticsearchEndpoint) super.getEndpoint();
- }
+ @Override
+ public ElasticsearchEndpoint getEndpoint() {
+ return (ElasticsearchEndpoint) super.getEndpoint();
+ }
- private String resolveOperation(Exchange exchange) {
- // 1. Operation can be driven by either (in order of preference):
- // a. If the body is an ActionRequest the operation is set by the type
- // of request.
- // b. If the body is not an ActionRequest, the operation is set by the
- // header if it exists.
- // c. If neither the operation can not be derived from the body or
- // header, the configuration is used.
- // In the event we can't discover the operation from a, b or c we throw
- // an error.
+ private String resolveOperation(Exchange exchange) {
+ // 1. Operation can be driven by either (in order of preference):
+ // a. If the body is an ActionRequest the operation is set by the type
+ // of request.
+ // b. If the body is not an ActionRequest, the operation is set by the
+ // header if it exists.
+ // c. If neither the operation can not be derived from the body or
+ // header, the configuration is used.
+ // In the event we can't discover the operation from a, b or c we throw
+ // an error.
- Object request = exchange.getIn().getBody();
- if (request instanceof IndexRequest)
- return ElasticsearchConfiguration.OPERATION_INDEX;
- else if (request instanceof GetRequest)
- return ElasticsearchConfiguration.OPERATION_GET_BY_ID;
- else if (request instanceof BulkRequest)
- return ElasticsearchConfiguration.OPERATION_BULK_INDEX;
- else if (request instanceof DeleteRequest)
- return ElasticsearchConfiguration.OPERATION_DELETE;
+ Object request = exchange.getIn().getBody();
+ if (request instanceof IndexRequest) {
+ return ElasticsearchConfiguration.OPERATION_INDEX;
+ } else if (request instanceof GetRequest) {
+ return ElasticsearchConfiguration.OPERATION_GET_BY_ID;
+ } else if (request instanceof BulkRequest) {
+ return ElasticsearchConfiguration.OPERATION_BULK_INDEX;
+ } else if (request instanceof DeleteRequest) {
+ return ElasticsearchConfiguration.OPERATION_DELETE;
+ }
- String operationConfig = exchange.getIn().getHeader(
- ElasticsearchConfiguration.PARAM_OPERATION, String.class);
- if (operationConfig == null) {
- operationConfig = getEndpoint().getConfig().getOperation();
- }
- if (operationConfig == null)
- throw new IllegalArgumentException(
- ElasticsearchConfiguration.PARAM_OPERATION + " value '"
- + operationConfig + "' is not supported");
- return operationConfig;
- }
+ String operationConfig = exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_OPERATION, String.class);
+ if (operationConfig == null) {
+ operationConfig = getEndpoint().getConfig().getOperation();
+ }
+ if (operationConfig == null) {
+ throw new IllegalArgumentException(ElasticsearchConfiguration.PARAM_OPERATION + " value '" + operationConfig + "' is not supported");
+ }
+ return operationConfig;
+ }
- public void process(Exchange exchange) throws Exception {
- // 2. Index and type will be set by:
- // a. If the incoming body is already an action request
- // b. If the body is not an action request we will use headers if they
- // are set.
- // c. If the body is not an action request and the headers aren't set we
- // will use the configuration.
- // No error is thrown by the component in the event none of the above
- // conditions are met. The java es client
- // will throw.
+ public void process(Exchange exchange) throws Exception {
+ // 2. Index and type will be set by:
+ // a. If the incoming body is already an action request
+ // b. If the body is not an action request we will use headers if they
+ // are set.
+ // c. If the body is not an action request and the headers aren't set we
+ // will use the configuration.
+ // No error is thrown by the component in the event none of the above
+ // conditions are met. The java es client
+ // will throw.
- Message message = exchange.getIn();
- ElasticsearchConfiguration config = getEndpoint().getConfig();
- final String operation = resolveOperation(exchange);
+ Message message = exchange.getIn();
+ final String operation = resolveOperation(exchange);
- // Set the index/type headers on the exchange if necessary. This is used
- // for type conversion.
- boolean configIndexName = false;
- String indexName = message.getHeader(
- ElasticsearchConfiguration.PARAM_INDEX_NAME, String.class);
- if (indexName == null) {
- message.setHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME,
- getEndpoint().getConfig().getIndexName());
- configIndexName = true;
- }
+ // Set the index/type headers on the exchange if necessary. This is used
+ // for type conversion.
+ boolean configIndexName = false;
+ String indexName = message.getHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME, String.class);
+ if (indexName == null) {
+ message.setHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME, getEndpoint().getConfig().getIndexName());
+ configIndexName = true;
+ }
- boolean configIndexType = false;
- String indexType = message.getHeader(
- ElasticsearchConfiguration.PARAM_INDEX_TYPE, String.class);
- if (indexType == null) {
- message.setHeader(ElasticsearchConfiguration.PARAM_INDEX_TYPE,
- getEndpoint().getConfig().getIndexName());
- configIndexType = true;
- }
+ boolean configIndexType = false;
+ String indexType = message.getHeader(ElasticsearchConfiguration.PARAM_INDEX_TYPE, String.class);
+ if (indexType == null) {
+ message.setHeader(ElasticsearchConfiguration.PARAM_INDEX_TYPE, getEndpoint().getConfig().getIndexName());
+ configIndexType = true;
+ }
- Client client = getEndpoint().getClient();
- if (ElasticsearchConfiguration.OPERATION_INDEX.equals(operation)) {
- IndexRequest indexRequest = message.getBody(IndexRequest.class);
- message.setBody(client.index(indexRequest).actionGet().getId());
- } else if (ElasticsearchConfiguration.OPERATION_GET_BY_ID
- .equals(operation)) {
- GetRequest getRequest = message.getBody(GetRequest.class);
- message.setBody(client.get(getRequest));
- } else if (ElasticsearchConfiguration.OPERATION_BULK_INDEX
- .equals(operation)) {
- BulkRequest bulkRequest = message.getBody(BulkRequest.class);
- List<String> indexedIds = new LinkedList<String>();
- for (BulkItemResponse response : client.bulk(bulkRequest)
- .actionGet().getItems()) {
- indexedIds.add(response.getId());
- }
- log.debug("List of successfully indexed document ids : {}",
- indexedIds);
- message.setBody(indexedIds);
- } else if (ElasticsearchConfiguration.OPERATION_DELETE
- .equals(operation)) {
- DeleteRequest deleteRequest = message.getBody(DeleteRequest.class);
- message.setBody(client.delete(deleteRequest).actionGet());
- } else {
- throw new IllegalArgumentException(
- ElasticsearchConfiguration.PARAM_OPERATION + " value '"
- + operation + "' is not supported");
- }
+ Client client = getEndpoint().getClient();
+ if (ElasticsearchConfiguration.OPERATION_INDEX.equals(operation)) {
+ IndexRequest indexRequest = message.getBody(IndexRequest.class);
+ message.setBody(client.index(indexRequest).actionGet().getId());
+ } else if (ElasticsearchConfiguration.OPERATION_GET_BY_ID.equals(operation)) {
+ GetRequest getRequest = message.getBody(GetRequest.class);
+ message.setBody(client.get(getRequest));
+ } else if (ElasticsearchConfiguration.OPERATION_BULK_INDEX.equals(operation)) {
+ BulkRequest bulkRequest = message.getBody(BulkRequest.class);
+ List<String> indexedIds = new LinkedList<String>();
+ for (BulkItemResponse response : client.bulk(bulkRequest).actionGet().getItems()) {
+ indexedIds.add(response.getId());
+ }
+ log.debug("List of successfully indexed document ids : {}", indexedIds);
+ message.setBody(indexedIds);
+ } else if (ElasticsearchConfiguration.OPERATION_DELETE.equals(operation)) {
+ DeleteRequest deleteRequest = message.getBody(DeleteRequest.class);
+ message.setBody(client.delete(deleteRequest).actionGet());
+ } else {
+ throw new IllegalArgumentException(ElasticsearchConfiguration.PARAM_OPERATION + " value '" + operation + "' is not supported");
+ }
- // If we set params via the configuration on this exchange, remove them
- // now. This preserves legacy behavior for this component and enables a
- // use case where one message can be sent to multiple elasticsearch
- // endpoints where the user is relying on the endpoint configuration
- // (index/type) rather than header values. If we do not clear this out
- // sending the same message (index request, for example) to multiple
- // elasticsearch endpoints would have the effect overriding any
- // subsequent endpoint index/type with the first endpoint index/type.
- if (configIndexName) {
- message.removeHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME);
- }
+ // If we set params via the configuration on this exchange, remove them
+ // now. This preserves legacy behavior for this component and enables a
+ // use case where one message can be sent to multiple elasticsearch
+ // endpoints where the user is relying on the endpoint configuration
+ // (index/type) rather than header values. If we do not clear this out
+ // sending the same message (index request, for example) to multiple
+ // elasticsearch endpoints would have the effect overriding any
+ // subsequent endpoint index/type with the first endpoint index/type.
+ if (configIndexName) {
+ message.removeHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME);
+ }
- if (configIndexType) {
- message.removeHeader(ElasticsearchConfiguration.PARAM_INDEX_TYPE);
- }
- }
+ if (configIndexType) {
+ message.removeHeader(ElasticsearchConfiguration.PARAM_INDEX_TYPE);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/16d35285/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/aggregation/BulkRequestAggregationStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/aggregation/BulkRequestAggregationStrategy.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/aggregation/BulkRequestAggregationStrategy.java
index f32fb0a..7b938a0 100644
--- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/aggregation/BulkRequestAggregationStrategy.java
+++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/aggregation/BulkRequestAggregationStrategy.java
@@ -1,11 +1,13 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,7 +17,7 @@
package org.apache.camel.component.elasticsearch.aggregation;
import org.apache.camel.Exchange;
-import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.InvalidPayloadRuntimeException;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkRequest;
@@ -23,35 +25,27 @@ import org.elasticsearch.action.bulk.BulkRequest;
/**
* Aggregates two {@link ActionRequest}s into a single {@link BulkRequest}.
*/
-public class BulkRequestAggregationStrategy implements AggregationStrategy
-{
- /////////////////////////////////////////////////////////////////////////
- // Public methods
- /////////////////////////////////////////////////////////////////////////
+public class BulkRequestAggregationStrategy implements AggregationStrategy {
- @Override
- public Exchange aggregate(Exchange oldExchange, Exchange newExchange)
- {
- // Don't use getBody(Class<T>) here as we don't want to coerce the body type using a type converter.
- Object objBody = newExchange.getIn().getBody();
- if (!(objBody instanceof ActionRequest))
- throw new RuntimeCamelException("Invalid body type for elastisearch bulk request aggregation strategy: " +
- objBody.getClass().getName());
+ @Override
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ // Don't use getBody(Class<T>) here as we don't want to coerce the body type using a type converter.
+ Object objBody = newExchange.getIn().getBody();
+ if (!(objBody instanceof ActionRequest)) {
+ throw new InvalidPayloadRuntimeException(newExchange, ActionRequest.class);
+ }
- ActionRequest newBody = (ActionRequest)objBody;
- BulkRequest request = null;
- if (oldExchange == null)
- {
- request = new BulkRequest();
- request.add(newBody);
- newExchange.getIn().setBody(request);
- return newExchange;
- }
- else
- {
- request = oldExchange.getIn().getBody(BulkRequest.class);
- request.add(newBody);
- return oldExchange;
- }
- }
+ ActionRequest newBody = (ActionRequest) objBody;
+ BulkRequest request;
+ if (oldExchange == null) {
+ request = new BulkRequest();
+ request.add(newBody);
+ newExchange.getIn().setBody(request);
+ return newExchange;
+ } else {
+ request = oldExchange.getIn().getBody(BulkRequest.class);
+ request.add(newBody);
+ return oldExchange;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/16d35285/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java
index 2ad0d92..a62f218 100644
--- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java
+++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.elasticsearch.converter;
import java.util.List;
import java.util.Map;
+
import org.apache.camel.Converter;
import org.apache.camel.Exchange;
import org.apache.camel.component.elasticsearch.ElasticsearchConfiguration;
@@ -28,80 +29,68 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;
@Converter
-public class ElasticsearchActionRequestConverter {
-
- // Index requests
- private static IndexRequest createIndexRequest(Object document,
- Exchange exchange) {
- IndexRequest indexRequest = new IndexRequest();
- if (document instanceof byte[]) {
- indexRequest.source((byte[]) document);
- } else if (document instanceof Map) {
- indexRequest.source((Map<String, Object>) document);
- } else if (document instanceof String) {
- indexRequest.source((String) document);
- } else if (document instanceof XContentBuilder) {
- indexRequest.source((XContentBuilder) document);
- } else {
- return null;
- }
-
- return indexRequest.index(
- exchange.getIn().getHeader(
- ElasticsearchConfiguration.PARAM_INDEX_NAME,
- String.class)).type(
- exchange.getIn().getHeader(
- ElasticsearchConfiguration.PARAM_INDEX_TYPE,
- String.class));
- }
-
- @Converter
- public static IndexRequest toIndexRequest(Object document, Exchange exchange) {
- if (document == null)
- return null;
+public final class ElasticsearchActionRequestConverter {
- return createIndexRequest(document, exchange).id(
- exchange.getIn()
- .getHeader(ElasticsearchConfiguration.PARAM_INDEX_ID,
- String.class));
- }
+ private ElasticsearchActionRequestConverter() {
+ }
- @Converter
- public static GetRequest toGetRequest(String id, Exchange exchange) {
- if (id == null)
- return null;
+ // Index requests
+ private static IndexRequest createIndexRequest(Object document, Exchange exchange) {
+ IndexRequest indexRequest = new IndexRequest();
+ if (document instanceof byte[]) {
+ indexRequest.source((byte[]) document);
+ } else if (document instanceof Map) {
+ indexRequest.source((Map<String, Object>) document);
+ } else if (document instanceof String) {
+ indexRequest.source((String) document);
+ } else if (document instanceof XContentBuilder) {
+ indexRequest.source((XContentBuilder) document);
+ } else {
+ return null;
+ }
- return new GetRequest(exchange.getIn().getHeader(
- ElasticsearchConfiguration.PARAM_INDEX_NAME, String.class))
- .type(exchange.getIn().getHeader(
- ElasticsearchConfiguration.PARAM_INDEX_TYPE,
- String.class)).id(id);
- }
+ return indexRequest.index(
+ exchange.getIn().getHeader(
+ ElasticsearchConfiguration.PARAM_INDEX_NAME,
+ String.class)).type(
+ exchange.getIn().getHeader(
+ ElasticsearchConfiguration.PARAM_INDEX_TYPE,
+ String.class));
+ }
- @Converter
- public static DeleteRequest toDeleteRequest(String id, Exchange exchange) {
- if (id == null)
- return null;
+ @Converter
+ public static IndexRequest toIndexRequest(Object document, Exchange exchange) {
+ return createIndexRequest(document, exchange)
+ .id(exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_INDEX_ID, String.class));
+ }
- return new DeleteRequest()
- .index(exchange.getIn().getHeader(
- ElasticsearchConfiguration.PARAM_INDEX_NAME,
- String.class))
- .type(exchange.getIn().getHeader(
- ElasticsearchConfiguration.PARAM_INDEX_TYPE,
- String.class)).id(id);
- }
+ @Converter
+ public static GetRequest toGetRequest(String id, Exchange exchange) {
+ return new GetRequest(exchange.getIn().getHeader(
+ ElasticsearchConfiguration.PARAM_INDEX_NAME, String.class))
+ .type(exchange.getIn().getHeader(
+ ElasticsearchConfiguration.PARAM_INDEX_TYPE,
+ String.class)).id(id);
+ }
- @Converter
- public static BulkRequest toBulkRequest(List<Object> documents,
- Exchange exchange) {
- if (documents == null)
- return null;
+ @Converter
+ public static DeleteRequest toDeleteRequest(String id, Exchange exchange) {
+ return new DeleteRequest()
+ .index(exchange.getIn().getHeader(
+ ElasticsearchConfiguration.PARAM_INDEX_NAME,
+ String.class))
+ .type(exchange.getIn().getHeader(
+ ElasticsearchConfiguration.PARAM_INDEX_TYPE,
+ String.class)).id(id);
+ }
- BulkRequest request = new BulkRequest();
- for (Object document : documents) {
- request.add(createIndexRequest(document, exchange));
- }
- return request;
- }
+ @Converter
+ public static BulkRequest toBulkRequest(List<Object> documents,
+ Exchange exchange) {
+ BulkRequest request = new BulkRequest();
+ for (Object document : documents) {
+ request.add(createIndexRequest(document, exchange));
+ }
+ return request;
+ }
}