You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by jn...@apache.org on 2015/08/26 14:41:54 UTC

svn commit: r1697911 - in /nutch/trunk: ./ src/plugin/ src/plugin/indexer-cloudsearch/ src/plugin/indexer-cloudsearch/src/ src/plugin/indexer-cloudsearch/src/java/ src/plugin/indexer-cloudsearch/src/java/org/ src/plugin/indexer-cloudsearch/src/java/org...

Author: jnioche
Date: Wed Aug 26 12:41:53 2015
New Revision: 1697911

URL: http://svn.apache.org/r1697911
Log:
CloudSearch indexer

Added:
    nutch/trunk/src/plugin/indexer-cloudsearch/
    nutch/trunk/src/plugin/indexer-cloudsearch/README.md
    nutch/trunk/src/plugin/indexer-cloudsearch/build.xml
    nutch/trunk/src/plugin/indexer-cloudsearch/createCSDomain.sh
    nutch/trunk/src/plugin/indexer-cloudsearch/ivy.xml
    nutch/trunk/src/plugin/indexer-cloudsearch/plugin.xml
    nutch/trunk/src/plugin/indexer-cloudsearch/src/
    nutch/trunk/src/plugin/indexer-cloudsearch/src/java/
    nutch/trunk/src/plugin/indexer-cloudsearch/src/java/org/
    nutch/trunk/src/plugin/indexer-cloudsearch/src/java/org/apache/
    nutch/trunk/src/plugin/indexer-cloudsearch/src/java/org/apache/nutch/
    nutch/trunk/src/plugin/indexer-cloudsearch/src/java/org/apache/nutch/indexwriter/
    nutch/trunk/src/plugin/indexer-cloudsearch/src/java/org/apache/nutch/indexwriter/cloudsearch/
    nutch/trunk/src/plugin/indexer-cloudsearch/src/java/org/apache/nutch/indexwriter/cloudsearch/CloudSearchConstants.java
    nutch/trunk/src/plugin/indexer-cloudsearch/src/java/org/apache/nutch/indexwriter/cloudsearch/CloudSearchIndexWriter.java
    nutch/trunk/src/plugin/indexer-cloudsearch/src/java/org/apache/nutch/indexwriter/cloudsearch/CloudSearchUtils.java
Modified:
    nutch/trunk/CHANGES.txt
    nutch/trunk/src/plugin/build.xml

Modified: nutch/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1697911&r1=1697910&r2=1697911&view=diff
==============================================================================
--- nutch/trunk/CHANGES.txt (original)
+++ nutch/trunk/CHANGES.txt Wed Aug 26 12:41:53 2015
@@ -2,6 +2,8 @@ Nutch Change Log
   
 Nutch Current Development 1.11-SNAPSHOT
 
+* NUTCH-1517 CloudSearch indexer (jnioche)
+
 * NUTCH-2085 Upgrade Guava (markus)
 
 * NUTCH-2084 SegmentMerger to report missing input dirs (markus)

Modified: nutch/trunk/src/plugin/build.xml
URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/build.xml?rev=1697911&r1=1697910&r2=1697911&view=diff
==============================================================================
--- nutch/trunk/src/plugin/build.xml (original)
+++ nutch/trunk/src/plugin/build.xml Wed Aug 26 12:41:53 2015
@@ -37,6 +37,7 @@
      <ant dir="index-static" target="deploy"/>
      <ant dir="index-metadata" target="deploy"/>
      <ant dir="mimetype-filter" target="deploy"/>
+     <ant dir="indexer-cloudsearch" target="deploy"/>
      <ant dir="indexer-dummy" target="deploy"/>
      <ant dir="indexer-elastic" target="deploy"/>
      <ant dir="indexer-solr" target="deploy"/>
@@ -143,6 +144,7 @@
     <ant dir="index-replace" target="clean"/>
     <ant dir="index-metadata" target="clean"/>
     <ant dir="mimetype-filter" target="clean"/>
+    <ant dir="indexer-cloudsearch" target="clean"/>
     <ant dir="indexer-dummy" target="clean"/>
     <ant dir="indexer-elastic" target="clean"/>
     <ant dir="indexer-solr" target="clean"/>

Added: nutch/trunk/src/plugin/indexer-cloudsearch/README.md
URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-cloudsearch/README.md?rev=1697911&view=auto
==============================================================================
--- nutch/trunk/src/plugin/indexer-cloudsearch/README.md (added)
+++ nutch/trunk/src/plugin/indexer-cloudsearch/README.md Wed Aug 26 12:41:53 2015
@@ -0,0 +1,58 @@
+AWS CloudSearch plugin for Nutch 
+================================
+
+See [http://aws.amazon.com/cloudsearch/] for information on AWS CloudSearch.
+
+Steps to use :
+
+From runtime/local/bin
+
+* Configure the AWS credentials 
+
+Edit `~/.aws/credentials`, see [http://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html] for details. Note that this should not be necessary when running Nutch on EC2.
+
+* Edit ../conf/nutch-site.xml and check that 'plugin.includes' contains 'indexer-cloudsearch'. 
+
+* (Optional) Test the indexing 
+
+`./nutch indexchecker -D doIndex=true -D cloudsearch.batch.dump=true "http://nutch.apache.org/"`
+
+if the agent name hasn't been configured in nutch-site.xml, it can be added on the command line with `-D http.agent.name=whateverValueDescribesYouBest`
+
+you should see the fields extracted for the indexing coming up on the console.
+
+Using the `cloudsearch.batch.dump` parameter allows to dump the batch to the local temp dir. The files has the prefix "CloudSearch_" e.g. `/tmp/CloudSearch_4822180575734804454.json`. This temp file can be used as a template when defining the fields in the domain creation (see below).
+
+* Create a CloudSearch domain
+
+This can be done using the web console [https://eu-west-1.console.aws.amazon.com/cloudsearch/home?region=eu-west-1#]. You can use the temp file generated above to bootstrap the field definition. 
+
+You can also create the domain using the AWS CLI [http://docs.aws.amazon.com/cloudsearch/latest/developerguide/creating-domains.html] and the `createCSDomain.sh` example script provided. This script is merely as starting point which you should further improve and fine tune. 
+
+Note that the creation of the domain can take some time. Once it is complete, note the document endpoint, or alternatively verify the region and domain name.
+
+* Edit ../conf/nutch-site.xml and add `cloudsearch.endpoint` and `cloudsearch.region`. 
+
+* Re-test the indexing
+
+`./nutch indexchecker -D doIndex=true "http://nutch.apache.org/"`
+
+and check in the CloudSearch console that the document has been succesfully indexed.
+
+Additional parameters
+
+* `cloudsearch.batch.maxSize` \: can be used to limit the size of the batches sent to CloudSearch to N documents. Note that the default limitations still apply.
+
+* `cloudsearch.batch.dump` \: see above. Stores the JSON representation of the document batch in the local temp dir, useful for bootstrapping the index definition.
+
+Note
+
+The CloudSearchIndexWriter will log any errors while sending the batches to CloudSearch and will resume the process without breaking. This means that you might not get all the documents in the index. You should check the log files for errors. Using small batch sizes will limit the number of documents skipped in case of error.
+
+Any fields not defined in the CloudSearch domain will be ignored by the CloudSearchIndexWriter. Again, the logs will contain a trace of any field names skipped.
+
+
+
+  
+
+

Added: nutch/trunk/src/plugin/indexer-cloudsearch/build.xml
URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-cloudsearch/build.xml?rev=1697911&view=auto
==============================================================================
--- nutch/trunk/src/plugin/indexer-cloudsearch/build.xml (added)
+++ nutch/trunk/src/plugin/indexer-cloudsearch/build.xml Wed Aug 26 12:41:53 2015
@@ -0,0 +1,22 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project name="indexer-cloudsearch" default="jar-core">
+
+  <import file="../build-plugin.xml" />
+
+</project>

Added: nutch/trunk/src/plugin/indexer-cloudsearch/createCSDomain.sh
URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-cloudsearch/createCSDomain.sh?rev=1697911&view=auto
==============================================================================
--- nutch/trunk/src/plugin/indexer-cloudsearch/createCSDomain.sh (added)
+++ nutch/trunk/src/plugin/indexer-cloudsearch/createCSDomain.sh Wed Aug 26 12:41:53 2015
@@ -0,0 +1,22 @@
+# example of domain configuration for CloudSearch
+
+DOMAIN="$1"
+
+if [ "$DOMAIN" = "" ]; then
+    echo "Need to specify a domain name as argument"
+    exit -1;
+fi
+
+aws cloudsearch create-domain --domain-name $DOMAIN
+
+aws cloudsearch define-index-field --domain-name $DOMAIN --name boost --type double --sort-enabled true --facet-enabled false
+aws cloudsearch define-index-field --domain-name $DOMAIN --name content --type text --sort-enabled false
+aws cloudsearch define-index-field --domain-name $DOMAIN --name digest --type literal --sort-enabled false --facet-enabled false
+aws cloudsearch define-index-field --domain-name $DOMAIN --name host --type literal --sort-enabled false --facet-enabled true
+aws cloudsearch define-index-field --domain-name $DOMAIN --name id --type literal --sort-enabled false --facet-enabled false
+aws cloudsearch define-index-field --domain-name $DOMAIN --name segment --type literal --sort-enabled true --facet-enabled false
+aws cloudsearch define-index-field --domain-name $DOMAIN --name title --type text --sort-enabled false
+aws cloudsearch define-index-field --domain-name $DOMAIN --name tstamp --type date --sort-enabled true --facet-enabled false
+aws cloudsearch define-index-field --domain-name $DOMAIN --name url --type literal --sort-enabled false --facet-enabled false
+
+

Added: nutch/trunk/src/plugin/indexer-cloudsearch/ivy.xml
URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-cloudsearch/ivy.xml?rev=1697911&view=auto
==============================================================================
--- nutch/trunk/src/plugin/indexer-cloudsearch/ivy.xml (added)
+++ nutch/trunk/src/plugin/indexer-cloudsearch/ivy.xml Wed Aug 26 12:41:53 2015
@@ -0,0 +1,41 @@
+<?xml version="1.0" ?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<ivy-module version="1.0">
+  <info organisation="org.apache.nutch" module="${ant.project.name}">
+    <license name="Apache 2.0"/>
+    <ivyauthor name="Apache Nutch Team" url="http://nutch.apache.org"/>
+    <description>
+        Apache Nutch
+    </description>
+  </info>
+
+  <configurations>
+    <include file="../../../ivy/ivy-configurations.xml"/>
+  </configurations>
+
+  <publications>
+    <!--get the artifact from our module name-->
+    <artifact conf="master"/>
+  </publications>
+
+  <dependencies>
+	<dependency org="com.amazonaws" name="aws-java-sdk-cloudsearch" rev="1.10.0"/>
+  </dependencies>
+  
+</ivy-module>

Added: nutch/trunk/src/plugin/indexer-cloudsearch/plugin.xml
URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-cloudsearch/plugin.xml?rev=1697911&view=auto
==============================================================================
--- nutch/trunk/src/plugin/indexer-cloudsearch/plugin.xml (added)
+++ nutch/trunk/src/plugin/indexer-cloudsearch/plugin.xml Wed Aug 26 12:41:53 2015
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<plugin id="indexer-cloudsearch" name="CloudSearchIndexWriter" version="1.0.0"
+  provider-name="nutch.apache.org">
+
+  <runtime>
+    <library name="indexer-cloudsearch.jar">
+      <export name="*" />
+    </library>
+
+     <library name="aws-java-sdk-cloudsearch-1.10.0.jar"/>
+     <library name="aws-java-sdk-core-1.10.0.jar"/>
+     <library name="commons-codec-1.6.jar"/>
+     <library name="commons-logging-1.1.3.jar"/>
+     <library name="httpclient-4.3.6.jar"/>
+     <library name="httpcore-4.3.3.jar"/>
+     <library name="jackson-annotations-2.5.0.jar"/>
+     <library name="jackson-core-2.5.3.jar"/>
+     <library name="jackson-databind-2.5.3.jar"/>
+     <library name="joda-time-2.8.jar"/>
+
+  </runtime>
+
+  <requires>
+    <import plugin="nutch-extensionpoints" />
+  </requires>
+
+  <extension id="org.apache.nutch.indexer.cloudsearch"
+    name="CloudSearch Index Writer"
+    point="org.apache.nutch.indexer.IndexWriter">
+    <implementation id="CloudSearchIndexWriter"
+      class="org.apache.nutch.indexwriter.cloudsearch.CloudSearchIndexWriter" />
+  </extension>
+
+</plugin>

Added: nutch/trunk/src/plugin/indexer-cloudsearch/src/java/org/apache/nutch/indexwriter/cloudsearch/CloudSearchConstants.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-cloudsearch/src/java/org/apache/nutch/indexwriter/cloudsearch/CloudSearchConstants.java?rev=1697911&view=auto
==============================================================================
--- nutch/trunk/src/plugin/indexer-cloudsearch/src/java/org/apache/nutch/indexwriter/cloudsearch/CloudSearchConstants.java (added)
+++ nutch/trunk/src/plugin/indexer-cloudsearch/src/java/org/apache/nutch/indexwriter/cloudsearch/CloudSearchConstants.java Wed Aug 26 12:41:53 2015
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.indexwriter.cloudsearch;
+
+public interface CloudSearchConstants {
+  public static final String CLOUDSEARCH_PREFIX = "cloudsearch.";
+  public static final String ENDPOINT = CLOUDSEARCH_PREFIX + "endpoint";
+  public static final String REGION = CLOUDSEARCH_PREFIX + "region";
+  public static final String BATCH_DUMP = CLOUDSEARCH_PREFIX + "batch.dump";
+  public static final String MAX_DOCS_BATCH = CLOUDSEARCH_PREFIX
+      + "batch.maxSize";
+}

Added: nutch/trunk/src/plugin/indexer-cloudsearch/src/java/org/apache/nutch/indexwriter/cloudsearch/CloudSearchIndexWriter.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-cloudsearch/src/java/org/apache/nutch/indexwriter/cloudsearch/CloudSearchIndexWriter.java?rev=1697911&view=auto
==============================================================================
--- nutch/trunk/src/plugin/indexer-cloudsearch/src/java/org/apache/nutch/indexwriter/cloudsearch/CloudSearchIndexWriter.java (added)
+++ nutch/trunk/src/plugin/indexer-cloudsearch/src/java/org/apache/nutch/indexwriter/cloudsearch/CloudSearchIndexWriter.java Wed Aug 26 12:41:53 2015
@@ -0,0 +1,380 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.indexwriter.cloudsearch;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.nutch.indexer.IndexWriter;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.indexer.NutchField;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.regions.RegionUtils;
+import com.amazonaws.services.cloudsearchdomain.AmazonCloudSearchDomainClient;
+import com.amazonaws.services.cloudsearchdomain.model.ContentType;
+import com.amazonaws.services.cloudsearchdomain.model.UploadDocumentsRequest;
+import com.amazonaws.services.cloudsearchdomain.model.UploadDocumentsResult;
+import com.amazonaws.services.cloudsearchv2.AmazonCloudSearchClient;
+import com.amazonaws.services.cloudsearchv2.model.DescribeDomainsRequest;
+import com.amazonaws.services.cloudsearchv2.model.DescribeDomainsResult;
+import com.amazonaws.services.cloudsearchv2.model.DescribeIndexFieldsRequest;
+import com.amazonaws.services.cloudsearchv2.model.DescribeIndexFieldsResult;
+import com.amazonaws.services.cloudsearchv2.model.DomainStatus;
+import com.amazonaws.services.cloudsearchv2.model.IndexFieldStatus;
+import com.amazonaws.util.json.JSONException;
+import com.amazonaws.util.json.JSONObject;
+
+/**
+ * Writes documents to CloudSearch.
+ */
+public class CloudSearchIndexWriter implements IndexWriter {
+  public static final Logger LOG = LoggerFactory
+      .getLogger(CloudSearchIndexWriter.class);
+
+  private static final int MAX_SIZE_BATCH_BYTES = 5242880;
+  private static final int MAX_SIZE_DOC_BYTES = 1048576;
+
+  private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat(
+      "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+
+  private AmazonCloudSearchDomainClient client;
+
+  private int maxDocsInBatch = -1;
+
+  private StringBuffer buffer;
+
+  private int numDocsInBatch = 0;
+
+  private boolean dumpBatchFilesToTemp = false;
+
+  private Configuration conf;
+
+  private Map<String, String> csfields = new HashMap<String, String>();
+
+  private String regionName;
+
+  @Override
+  public void open(JobConf job, String name) throws IOException {
+    LOG.debug("CloudSearchIndexWriter.open() name={} ", name);
+
+    maxDocsInBatch = job.getInt(CloudSearchConstants.MAX_DOCS_BATCH, -1);
+
+    buffer = new StringBuffer(MAX_SIZE_BATCH_BYTES).append('[');
+
+    dumpBatchFilesToTemp = job.getBoolean(CloudSearchConstants.BATCH_DUMP,
+        false);
+
+    if (dumpBatchFilesToTemp) {
+      // only dumping to local file
+      // no more config required
+      return;
+    }
+
+    String endpoint = job.get(CloudSearchConstants.ENDPOINT);
+
+    if (StringUtils.isBlank(endpoint)) {
+      throw new RuntimeException("endpoint not set for CloudSearch");
+    }
+
+    AmazonCloudSearchClient cl = new AmazonCloudSearchClient();
+    if (StringUtils.isNotBlank(regionName)) {
+      cl.setRegion(RegionUtils.getRegion(regionName));
+    }
+
+    String domainName = null;
+
+    // retrieve the domain name
+    DescribeDomainsResult domains = cl
+        .describeDomains(new DescribeDomainsRequest());
+
+    Iterator<DomainStatus> dsiter = domains.getDomainStatusList().iterator();
+    while (dsiter.hasNext()) {
+      DomainStatus ds = dsiter.next();
+      if (ds.getDocService().getEndpoint().equals(endpoint)) {
+        domainName = ds.getDomainName();
+        break;
+      }
+    }
+
+    // check domain name
+    if (StringUtils.isBlank(domainName)) {
+      throw new RuntimeException(
+          "No domain name found for CloudSearch endpoint");
+    }
+
+    DescribeIndexFieldsResult indexDescription = cl.describeIndexFields(
+        new DescribeIndexFieldsRequest().withDomainName(domainName));
+    for (IndexFieldStatus ifs : indexDescription.getIndexFields()) {
+      String indexname = ifs.getOptions().getIndexFieldName();
+      String indextype = ifs.getOptions().getIndexFieldType();
+      LOG.info("CloudSearch index name {} of type {}", indexname, indextype);
+      csfields.put(indexname, indextype);
+    }
+
+    client = new AmazonCloudSearchDomainClient();
+    client.setEndpoint(endpoint);
+
+  }
+
+  @Override
+  public void delete(String url) throws IOException {
+
+    try {
+      JSONObject doc_builder = new JSONObject();
+
+      doc_builder.put("type", "delete");
+
+      // generate the id from the url
+      String ID = CloudSearchUtils.getID(url);
+      doc_builder.put("id", ID);
+
+      // add to the batch
+      addToBatch(doc_builder.toString(2), url);
+
+    } catch (JSONException e) {
+      LOG.error("Exception caught while building JSON object", e);
+    }
+
+  }
+
+  @Override
+  public void update(NutchDocument doc) throws IOException {
+    write(doc);
+  }
+
+  @Override
+  public void write(NutchDocument doc) throws IOException {
+    try {
+      JSONObject doc_builder = new JSONObject();
+
+      doc_builder.put("type", "add");
+
+      String url = doc.getField("url").toString();
+
+      // generate the id from the url
+      String ID = CloudSearchUtils.getID(url);
+      doc_builder.put("id", ID);
+
+      JSONObject fields = new JSONObject();
+
+      for (final Entry<String, NutchField> e : doc) {
+        String fieldname = cleanFieldName(e.getKey());
+        String type = csfields.get(fieldname);
+
+        // undefined in index
+        if (!dumpBatchFilesToTemp && type == null) {
+          LOG.info(
+              "Field {} not defined in CloudSearch domain for {} - skipping.",
+              fieldname, url);
+          continue;
+        }
+
+        List<Object> values = e.getValue().getValues();
+        // write the values
+        for (Object value : values) {
+          // Convert dates to an integer
+          if (value instanceof Date) {
+            Date d = (Date) value;
+            value = DATE_FORMAT.format(d);
+          }
+          // normalise strings
+          else if (value instanceof String) {
+            value = CloudSearchUtils.stripNonCharCodepoints((String) value);
+          }
+
+          fields.accumulate(fieldname, value);
+        }
+      }
+
+      doc_builder.put("fields", fields);
+
+      addToBatch(doc_builder.toString(2), url);
+
+    } catch (JSONException e) {
+      LOG.error("Exception caught while building JSON object", e);
+    }
+  }
+
+  private void addToBatch(String currentDoc, String url) throws IOException {
+    int currentDocLength = currentDoc.getBytes(StandardCharsets.UTF_8).length;
+
+    // check that the doc is not too large -> skip it if it does
+    if (currentDocLength > MAX_SIZE_DOC_BYTES) {
+      LOG.error("Doc too large. currentDoc.length {} : {}", currentDocLength,
+          url);
+      return;
+    }
+
+    int currentBufferLength = buffer.toString()
+        .getBytes(StandardCharsets.UTF_8).length;
+
+    LOG.debug("currentDoc.length {}, buffer length {}", currentDocLength,
+        currentBufferLength);
+
+    // can add it to the buffer without overflowing?
+    if (currentDocLength + 2 + currentBufferLength < MAX_SIZE_BATCH_BYTES) {
+      if (numDocsInBatch != 0)
+        buffer.append(',');
+      buffer.append(currentDoc);
+      numDocsInBatch++;
+    }
+    // flush the previous batch and create a new one with this doc
+    else {
+      commit();
+      buffer.append(currentDoc);
+      numDocsInBatch++;
+    }
+
+    // have we reached the max number of docs in a batch after adding
+    // this doc?
+    if (maxDocsInBatch > 0 && numDocsInBatch == maxDocsInBatch) {
+      commit();
+    }
+  }
+
+  @Override
+  public void commit() throws IOException {
+
+    // nothing to do
+    if (numDocsInBatch == 0) {
+      return;
+    }
+
+    // close the array
+    buffer.append(']');
+
+    LOG.info("Sending {} docs to CloudSearch", numDocsInBatch);
+
+    byte[] bb = buffer.toString().getBytes(StandardCharsets.UTF_8);
+
+    if (dumpBatchFilesToTemp) {
+      try {
+        File temp = File.createTempFile("CloudSearch_", ".json");
+        FileUtils.writeByteArrayToFile(temp, bb);
+        LOG.info("Wrote batch file {}", temp.getName());
+      } catch (IOException e1) {
+        LOG.error("Exception while generating batch file", e1);
+      } finally {
+        // reset buffer and doc counter
+        buffer = new StringBuffer(MAX_SIZE_BATCH_BYTES).append('[');
+        numDocsInBatch = 0;
+      }
+      return;
+    }
+    // not in debug mode
+    try (InputStream inputStream = new ByteArrayInputStream(bb)) {
+      UploadDocumentsRequest batch = new UploadDocumentsRequest();
+      batch.setContentLength((long) bb.length);
+      batch.setContentType(ContentType.Applicationjson);
+      batch.setDocuments(inputStream);
+      UploadDocumentsResult result = client.uploadDocuments(batch);
+    } catch (Exception e) {
+      LOG.error("Exception while sending batch", e);
+      LOG.error(buffer.toString());
+    } finally {
+      // reset buffer and doc counter
+      buffer = new StringBuffer(MAX_SIZE_BATCH_BYTES).append('[');
+      numDocsInBatch = 0;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    // This will flush any unsent documents.
+    commit();
+    // close the client
+    client.shutdown();
+  }
+
+  public Configuration getConf() {
+    return this.conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    String endpoint = getConf().get(CloudSearchConstants.ENDPOINT);
+    boolean dumpBatchFilesToTemp = getConf()
+        .getBoolean(CloudSearchConstants.BATCH_DUMP, false);
+    this.regionName = getConf().get(CloudSearchConstants.REGION);
+
+    if (StringUtils.isBlank(endpoint) && !dumpBatchFilesToTemp) {
+      String message = "Missing CloudSearch endpoint. Should set it set via -D "
+          + CloudSearchConstants.ENDPOINT + " or in nutch-site.xml";
+      message += "\n" + describe();
+      LOG.error(message);
+      throw new RuntimeException(message);
+    }
+  }
+
+  public String describe() {
+    String configuredEndpoint = null;
+    String configuredRegion = null;
+
+    // get the values set in the conf
+    if (getConf() != null) {
+      configuredEndpoint = getConf().get(CloudSearchConstants.ENDPOINT);
+      configuredRegion = getConf().get(CloudSearchConstants.REGION);
+    }
+
+    StringBuffer sb = new StringBuffer("CloudSearchIndexWriter\n");
+    sb.append("\t").append(CloudSearchConstants.ENDPOINT)
+        .append(" : URL of the CloudSearch domain's document endpoint.");
+    if (StringUtils.isNotBlank(configuredEndpoint)) {
+      sb.append(" (value: ").append(configuredEndpoint).append(")");
+    }
+    sb.append("\n");
+
+    sb.append("\t").append(CloudSearchConstants.REGION)
+        .append(" : name of the CloudSearch region.");
+    if (StringUtils.isNotBlank(configuredRegion)) {
+      sb.append(" (").append(configuredRegion).append(")");
+    }
+    sb.append("\n");
+    return sb.toString();
+  }
+
+  /**
+   * Remove the non-cloudSearch-legal characters. Note that this might convert
+   * two fields to the same name.
+   * 
+   * @param name
+   * @return
+   */
+  String cleanFieldName(String name) {
+    String lowercase = name.toLowerCase();
+    return lowercase.replaceAll("[^a-z_0-9]", "_");
+  }
+
+}

Added: nutch/trunk/src/plugin/indexer-cloudsearch/src/java/org/apache/nutch/indexwriter/cloudsearch/CloudSearchUtils.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-cloudsearch/src/java/org/apache/nutch/indexwriter/cloudsearch/CloudSearchUtils.java?rev=1697911&view=auto
==============================================================================
--- nutch/trunk/src/plugin/indexer-cloudsearch/src/java/org/apache/nutch/indexwriter/cloudsearch/CloudSearchUtils.java (added)
+++ nutch/trunk/src/plugin/indexer-cloudsearch/src/java/org/apache/nutch/indexwriter/cloudsearch/CloudSearchUtils.java Wed Aug 26 12:41:53 2015
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.indexwriter.cloudsearch;
+
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+import org.apache.commons.codec.binary.Hex;
+
+public class CloudSearchUtils {
+
+  private static MessageDigest digester;
+
+  static {
+    try {
+      digester = MessageDigest.getInstance("SHA-512");
+    } catch (NoSuchAlgorithmException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /** Returns a normalised doc ID based on the URL of a document **/
+  public static String getID(String url) {
+
+    // the document needs an ID
+    // @see
+    // http://docs.aws.amazon.com/cloudsearch/latest/developerguide/preparing-data.html#creating-document-batches
+    // A unique ID for the document. A document ID can contain any
+    // letter or number and the following characters: _ - = # ; : / ? @
+    // &. Document IDs must be at least 1 and no more than 128
+    // characters long.
+    byte[] dig = digester.digest(url.getBytes(StandardCharsets.UTF_8));
+    String ID = Hex.encodeHexString(dig);
+    // is that even possible?
+    if (ID.length() > 128) {
+      throw new RuntimeException("ID larger than max 128 chars");
+    }
+    return ID;
+  }
+
+  public static String stripNonCharCodepoints(String input) {
+    StringBuilder retval = new StringBuilder();
+    char ch;
+
+    for (int i = 0; i < input.length(); i++) {
+      ch = input.charAt(i);
+
+      // Keep only characters that are legal for CloudSearch
+      if ((ch == 0x9 || ch == 0xa || ch == 0xd)
+          || (ch >= 0x20 && ch <= 0xFFFD)) {
+        retval.append(ch);
+      }
+    }
+
+    return retval.toString();
+  }
+}