You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2012/12/31 20:46:25 UTC

svn commit: r1427229 [1/2] - in /manifoldcf/branches/CONNECTORS-594/connectors/solr: ./ connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/solr/

Author: kwright
Date: Mon Dec 31 19:46:25 2012
New Revision: 1427229

URL: http://svn.apache.org/viewvc?rev=1427229&view=rev
Log:
Port Solr connector to Solr-J

Modified:
    manifoldcf/branches/CONNECTORS-594/connectors/solr/build.xml
    manifoldcf/branches/CONNECTORS-594/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/HttpPoster.java
    manifoldcf/branches/CONNECTORS-594/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/SolrConfig.java
    manifoldcf/branches/CONNECTORS-594/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/SolrConnector.java
    manifoldcf/branches/CONNECTORS-594/connectors/solr/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/solr/common_en_US.properties
    manifoldcf/branches/CONNECTORS-594/connectors/solr/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/solr/common_ja_JP.properties

Modified: manifoldcf/branches/CONNECTORS-594/connectors/solr/build.xml
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-594/connectors/solr/build.xml?rev=1427229&r1=1427228&r2=1427229&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-594/connectors/solr/build.xml (original)
+++ manifoldcf/branches/CONNECTORS-594/connectors/solr/build.xml Mon Dec 31 19:46:25 2012
@@ -19,6 +19,30 @@
 
     <import file="../connector-build.xml"/>
 
+    <path id="connector-classpath">
+        <path refid="mcf-connector-build.connector-classpath"/>
+        <fileset dir="../../lib">
+            <include name="solr-solrj*.jar"/>
+            <include name="httpmime*.jar"/>
+            <include name="zookeeper*.jar"/>
+            <include name="wstx-asl*.jar"/>
+            <include name="jcl-over-slf4j*.jar"/>
+        </fileset>
+    </path>
+
+    <target name="lib" depends="mcf-connector-build.lib,precompile-check" if="canBuild">
+        <mkdir dir="dist/lib"/>
+        <copy todir="dist/lib">
+            <fileset dir="../../lib">
+                <include name="solr-solrj*.jar"/>
+                <include name="httpmime*.jar"/>
+                <include name="zookeeper*.jar"/>
+                <include name="wstx-asl*.jar"/>
+                <include name="jcl-over-slf4j*.jar"/>
+            </fileset>
+        </copy>
+    </target>
+
     <target name="integration">
         <mkdir dir="dist/integration/solr-3.x"/>
         <copy todir="dist/integration/solr-3.x">

Modified: manifoldcf/branches/CONNECTORS-594/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/HttpPoster.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-594/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/HttpPoster.java?rev=1427229&r1=1427228&r2=1427229&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-594/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/HttpPoster.java (original)
+++ manifoldcf/branches/CONNECTORS-594/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/HttpPoster.java Mon Dec 31 19:46:25 2012
@@ -32,6 +32,33 @@ import javax.net.ssl.*;
 
 import org.apache.log4j.*;
 
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.Credentials;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.impl.conn.PoolingClientConnectionManager;
+import org.apache.http.params.HttpConnectionParams;
+import org.apache.http.params.HttpParams;
+import org.apache.http.params.BasicHttpParams;
+import org.apache.http.params.CoreConnectionPNames;
+import org.apache.http.client.params.ClientPNames;
+import org.apache.http.client.params.HttpClientParams;
+import org.apache.http.conn.ClientConnectionManager;
+import org.apache.http.conn.scheme.Scheme;
+import org.apache.http.conn.ssl.SSLSocketFactory;
+import org.apache.http.conn.ssl.AllowAllHostnameVerifier;
+import org.apache.http.conn.params.ConnRoutePNames;
+
+import org.apache.solr.client.solrj.SolrServer;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrServer;
+import org.apache.solr.client.solrj.impl.HttpSolrServer;
+import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;
+import org.apache.solr.client.solrj.impl.XMLResponseParser;
+import org.apache.solr.common.util.ContentStreamBase;
+
 /**
 * Posts an input stream to SOLR
 *
@@ -52,85 +79,81 @@ public class HttpPoster
   public static String ingestPasswordProperty = "org.apache.manifoldcf.ingest.password";
   public static String ingestMaxConnectionsProperty = "org.apache.manifoldcf.ingest.maxconnections";
 
-  // Chunk size for base64-encoded headers
-  protected final static int HEADER_CHUNK = 4096;
-
-  private String protocol;
-  private String host;
-  private javax.net.ssl.SSLSocketFactory socketFactory;
-  private int port;
-  private String encodedCredentials;
-  private String realm;
+  // Solrj connection-associated objects
+  protected ClientConnectionManager connectionManager = null;
+  protected SolrServer solrServer = null;
+  
+  // Action URI pieces
   private String postUpdateAction;
   private String postRemoveAction;
   private String postStatusAction;
+  
+  // Attribute names
   private String allowAttributeName;
   private String denyAttributeName;
   private String idAttributeName;
   
+  // Document max length
   private Long maxDocumentLength;
 
+  // Commit-within flag
   private String commitWithin;
-  
+
+  // Constants we need
   private static final String LITERAL = "literal.";
   private static final String NOTHING = "__NOTHING__";
   private static final String ID_METADATA = "lcf_metadata_id";
   private static final String COMMITWITHIN_METADATA = "commitWithin";
   
-  private int buffersize = 32768;  // default buffer size
-  double sizeCoefficient = 0.0005;    // 20 ms additional timeout per 2000 bytes, pulled out of my butt
-  /** the number of times we should poll for the response */
-  int responseRetries = 9000;         // Long basic wait: 3 minutes.  This will also be added to by a term based on the size of the request.
-  /** how long we should wait before checking for a new stream */
-  long responseRetryWait = 20L;
   /** How long to wait before retrying a failed ingestion */
-  long interruptionRetryTime = 60000L;
-
-  /** The multipart separator we're going to use.  I was thinking of including a random number, but that would wreck repeatability */
-  protected static byte[] separatorBytes = null;
-  protected static byte[] endBytes = null;
-  protected static byte[] postambleBytes = null;
-  protected static byte[] preambleBytes = null;
-  static
-  {
-    try
-    {
-      String separatorString = "------------------T-H-I-S--I-S--A--S-E-P-A-R-A-T-O-R--399123410141511";
-      separatorBytes = (separatorString+"\r\n").getBytes("ASCII");
-      endBytes = ("--"+separatorString+"--\r\n").getBytes("ASCII");
-      postambleBytes = "\r\n".getBytes("ASCII");
-      preambleBytes = "--".getBytes("ASCII");
-    }
-    catch (java.io.UnsupportedEncodingException e)
-    {
-      e.printStackTrace();
-      System.exit(1);
-    }
-  }
+  private static final long interruptionRetryTime = 60000L;
 
-  /** This is the secure socket factory we will use.  I'm presuming it's thread-safe, but
-  * if not, synchronization blocks are in order when it's used. */
-  protected static javax.net.ssl.SSLSocketFactory openSecureSocketFactory = null;
-  static
+  /** Initialize the SolrCloud http poster.
+  */
+  public HttpPoster(String zookeeperHosts, String collection,
+    int zkClientTimeout, int zkConnectTimeout,
+    String updatePath, String removePath, String statusPath,
+    String realm, String userID, String password,
+    String allowAttributeName, String denyAttributeName, String idAttributeName,
+    IKeystoreManager keystoreManager, Long maxDocumentLength,
+    String commitWithin)
+    throws ManifoldCFException
   {
+    // These are the paths to the handlers in Solr that deal with the actions we need to do
+    this.postUpdateAction = updatePath;
+    this.postRemoveAction = removePath;
+    this.postStatusAction = statusPath;
+    
+    this.commitWithin = commitWithin;
+    
+    this.allowAttributeName = allowAttributeName;
+    this.denyAttributeName = denyAttributeName;
+    this.idAttributeName = idAttributeName;
+    
+    this.maxDocumentLength = maxDocumentLength;
+    
+    // What to do with basic auth and keystore manager parameters?  Do they even make sense?
+    // MHL
+    
     try
     {
-      openSecureSocketFactory = getOpenSecureSocketFactory();
+      CloudSolrServer cloudSolrServer = new CloudSolrServer(zookeeperHosts);
+      cloudSolrServer.setZkClientTimeout(zkClientTimeout);
+      cloudSolrServer.setZkConnectTimeout(zkConnectTimeout);
+      cloudSolrServer.setDefaultCollection(collection);
+      // Set the solrj instance we want to use
+      solrServer = cloudSolrServer;
     }
-    catch (ManifoldCFException e)
+    catch (MalformedURLException e)
     {
-      // If we can't create, print and fail
-      e.printStackTrace();
-      System.exit(100);
+      throw new ManifoldCFException(e.getMessage(),e);
     }
   }
 
-  /**
-  * Initialized the http poster.
-  * @param userID is the unencoded user name, or null.
-  * @param password is the unencoded password, or null.
+  /** Initialize the standard http poster.
   */
-  public HttpPoster(String protocol, String server, int port, String webappName,
+  public HttpPoster(String protocol, String server, int port, String webapp, String core,
+    int connectionTimeout, int socketTimeout,
     String updatePath, String removePath, String statusPath,
     String realm, String userID, String password,
     String allowAttributeName, String denyAttributeName, String idAttributeName,
@@ -138,6 +161,11 @@ public class HttpPoster
     String commitWithin)
     throws ManifoldCFException
   {
+    // These are the paths to the handlers in Solr that deal with the actions we need to do
+    this.postUpdateAction = updatePath;
+    this.postRemoveAction = removePath;
+    this.postStatusAction = statusPath;
+    
     this.commitWithin = commitWithin;
     
     this.allowAttributeName = allowAttributeName;
@@ -145,54 +173,83 @@ public class HttpPoster
     this.idAttributeName = idAttributeName;
     
     this.maxDocumentLength = maxDocumentLength;
-    
-    this.host = server;
-    this.port = port;
-    this.protocol = protocol;
+
+    String location = "";
+    if (webapp != null)
+      location = "/" + webapp;
+    if (core != null)
+    {
+      if (webapp == null)
+        throw new ManifoldCFException("Webapp must be specified if core is specified.");
+      location += "/" + core;
+    }
+
+    // Initialize standard solr-j.
+    // First, we need an HttpClient where basic auth is properly set up.
+    PoolingClientConnectionManager localConnectionManager = new PoolingClientConnectionManager();
+    localConnectionManager.setMaxTotal(1);
+    SSLSocketFactory myFactory;
     if (keystoreManager != null)
-      this.socketFactory = keystoreManager.getSecureSocketFactory();
+    {
+      myFactory = new SSLSocketFactory(keystoreManager.getSecureSocketFactory(),
+        new AllowAllHostnameVerifier());
+    }
     else
-      // Use the "trust everything" one.
-      this.socketFactory = openSecureSocketFactory;
-
+    {
+      // Use the "trust everything" one
+      myFactory = new SSLSocketFactory(KeystoreManagerFactory.getTrustingSecureSocketFactory(),
+        new AllowAllHostnameVerifier());
+    }
+    Scheme myHttpsProtocol = new Scheme("https", 443, myFactory);
+    localConnectionManager.getSchemeRegistry().register(myHttpsProtocol);
+    connectionManager = localConnectionManager;
+          
+    BasicHttpParams params = new BasicHttpParams();
+    params.setBooleanParameter(CoreConnectionPNames.TCP_NODELAY,true);
+    params.setBooleanParameter(CoreConnectionPNames.STALE_CONNECTION_CHECK,false);
+    params.setBooleanParameter(ClientPNames.ALLOW_CIRCULAR_REDIRECTS,true);
+    params.setIntParameter(CoreConnectionPNames.SO_TIMEOUT,socketTimeout);
+    params.setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT,connectionTimeout);
+    params.setBooleanParameter(ClientPNames.HANDLE_REDIRECTS,true);
+    DefaultHttpClient localClient = new DefaultHttpClient(connectionManager,params);
+          
     if (userID != null && userID.length() > 0 && password != null)
     {
-      try
-      {
-        encodedCredentials = new org.apache.manifoldcf.core.common.Base64().encodeByteArray((userID+":"+password).getBytes("UTF-8"));
-      }
-      catch (java.io.UnsupportedEncodingException e)
-      {
-        throw new ManifoldCFException("Couldn't convert to utf-8 bytes: "+e.getMessage(),e);
-      }
-      this.realm = realm;
+      Credentials credentials = new UsernamePasswordCredentials(userID, password);
+      if (realm != null)
+        localClient.getCredentialsProvider().setCredentials(new AuthScope(AuthScope.ANY_HOST, AuthScope.ANY_PORT, realm), credentials);
+      else
+        localClient.getCredentialsProvider().setCredentials(AuthScope.ANY, credentials);
     }
-    else
-      encodedCredentials = null;
-
-    String postURI = protocol + "://" + server + ":" + Integer.toString(port);
 
-    if (webappName.length() > 0)
-      webappName = "/" + webappName;
-      
-    postUpdateAction = webappName + updatePath;
-    postRemoveAction = webappName + removePath;
-    postStatusAction = webappName + statusPath;
-
-    String x = ManifoldCF.getProperty(ingestBufferSizeProperty);
-    if (x != null && x.length() > 0)
-      buffersize = new Integer(x).intValue();
-    x = ManifoldCF.getProperty(ingestResponseRetryCount);
-    if (x != null && x.length() > 0)
-      responseRetries = new Integer(x).intValue();
-    x = ManifoldCF.getProperty(ingestResponseRetryInterval);
-    if (x != null && x.length() > 0)
-      responseRetryWait = new Long(x).longValue();
-    x = ManifoldCF.getProperty(ingestRescheduleInterval);
-    if (x != null && x.length() > 0)
-      interruptionRetryTime = new Long(x).longValue();
+    String httpSolrServerUrl = protocol + "://" + server + ":" + port + location;
+    HttpSolrServer httpSolrServer = new HttpSolrServer(httpSolrServerUrl, localClient);
+    // For portability with older versions of Solr
+    httpSolrServer.setParser(new XMLResponseParser());
+    // Set the solrj instance we want to use
+    solrServer = httpSolrServer;
   }
 
+  /** Poll the poster.
+  */
+  public void poll()
+  {
+    if (connectionManager != null)
+      connectionManager.closeIdleConnections(60000L,TimeUnit.MILLISECONDS);
+  }
+  
+  /** Shut down the poster.
+  */
+  public void shutdown()
+  {
+    if (solrServer != null)
+      solrServer.shutdown();
+    solrServer = null;
+    if (connectionManager != null)
+      connectionManager.shutdown();
+    connectionManager = null;
+  }
+  
   /** Cause a commit to happen.
   */
   public void commitPost()
@@ -216,10 +273,8 @@ public class HttpPoster
           Throwable thr = t.getException();
           if (thr != null)
           {
-            if (thr instanceof ServiceInterruption)
-              throw (ServiceInterruption)thr;
-            if (thr instanceof ManifoldCFException)
-              throw (ManifoldCFException)thr;
+            if (thr instanceof SolrServerException)
+              throw (SolrServerException)thr;
             if (thr instanceof IOException)
               throw (IOException)thr;
             if (thr instanceof RuntimeException)
@@ -235,6 +290,10 @@ public class HttpPoster
           throw new ManifoldCFException("Interrupted: "+e.getMessage(),ManifoldCFException.INTERRUPTED);
         }
       }
+      catch (SolrServerException e)
+      {
+        throw new ManifoldCFException("Solr exception during commit: "+e.getMessage(),e);
+      }
       catch (IOException ioe)
       {
         if (ioErrorRetry == 0)
@@ -248,7 +307,7 @@ public class HttpPoster
             true);
         }
       }
-
+      
       // Go back around again!
       // Sleep for a time, and retry
       try
@@ -265,6 +324,7 @@ public class HttpPoster
 
   }
   
+
   /**
   * Post the input stream to ingest
   * @param documentURI is the document's uri.
@@ -321,10 +381,8 @@ public class HttpPoster
           Throwable thr = t.getException();
           if (thr != null)
           {
-            if (thr instanceof ServiceInterruption)
-              throw (ServiceInterruption)thr;
-            if (thr instanceof ManifoldCFException)
-              throw (ManifoldCFException)thr;
+            if (thr instanceof SolrServerException)
+              throw (SolrServerException)thr;
             if (thr instanceof IOException)
               throw (IOException)thr;
             if (thr instanceof RuntimeException)
@@ -340,13 +398,17 @@ public class HttpPoster
           throw new ManifoldCFException("Interrupted: "+e.getMessage(),ManifoldCFException.INTERRUPTED);
         }
       }
+      catch (SolrServerException e)
+      {
+        throw new ManifoldCFException("Solr exception during indexing: "+e.getMessage(),e);
+      }
       catch (java.net.SocketTimeoutException ioe)
       {
         if (readFromDocumentStreamYet || ioErrorRetry == 0)
         {
           // If this continues, we should indeed abort the job.  Retries should not go on indefinitely either; 2 hours is plenty
           long currentTime = System.currentTimeMillis();
-          throw new ServiceInterruption("IO error connecting to ingestion API: "+ioe.getMessage()+"; ingestion will be retried again later",
+          throw new ServiceInterruption("IO error indexing: "+ioe.getMessage()+"; ingestion will be retried again later",
             ioe,
             currentTime + interruptionRetryTime,
             currentTime + 2L * 60L * 60000L,
@@ -409,10 +471,8 @@ public class HttpPoster
           Throwable thr = t.getException();
           if (thr != null)
           {
-            if (thr instanceof ServiceInterruption)
-              throw (ServiceInterruption)thr;
-            if (thr instanceof ManifoldCFException)
-              throw (ManifoldCFException)thr;
+            if (thr instanceof SolrServerException)
+              throw (SolrServerException)thr;
             if (thr instanceof IOException)
               throw (IOException)thr;
             if (thr instanceof RuntimeException)
@@ -428,6 +488,10 @@ public class HttpPoster
           throw new ManifoldCFException("Interrupted: "+e.getMessage(),ManifoldCFException.INTERRUPTED);
         }
       }
+      catch (SolrServerException e)
+      {
+        throw new ManifoldCFException("Solr exception during check: "+e.getMessage(),e);
+      }
       catch (IOException ioe)
       {
         if (ioErrorRetry == 0)
@@ -485,10 +549,8 @@ public class HttpPoster
           Throwable thr = t.getException();
           if (thr != null)
           {
-            if (thr instanceof ServiceInterruption)
-              throw (ServiceInterruption)thr;
-            if (thr instanceof ManifoldCFException)
-              throw (ManifoldCFException)thr;
+            if (thr instanceof SolrServerException)
+              throw (SolrServerException)thr;
             if (thr instanceof IOException)
               throw (IOException)thr;
             if (thr instanceof RuntimeException)
@@ -504,6 +566,10 @@ public class HttpPoster
           throw new ManifoldCFException("Interrupted: "+e.getMessage(),ManifoldCFException.INTERRUPTED);
         }
       }
+      catch (SolrServerException e)
+      {
+        throw new ManifoldCFException("Solr exception during delete: "+e.getMessage(),e);
+      }
       catch (IOException ioe)
       {
         if (ioErrorRetry == 0)
@@ -559,461 +625,27 @@ public class HttpPoster
     return new String[0];
   }
 
-  /**
-  * Read an ascii line from an input stream
-  */
-  protected static String readLine(InputStream in)
-    throws IOException
-  {
-    ByteBuffer bb = new ByteBuffer();
-    while (true)
-    {
-      int x = in.read();
-      if (x == -1)
-        throw new IOException("Unexpected EOF");
-      if (x == 13)
-        continue;
-      if (x == 10)
-        break;
-
-      bb.append((byte)x);
-    }
-    return bb.toString("ASCII");
-  }
-
-  /**
-  * Get the response code of the post
-  * @param in the stream the response is going to come from
-  * @return the response details.
-  * @throws ManifoldCFException
-  */
-  protected CodeDetails getResponse(InputStream in) throws ManifoldCFException, ServiceInterruption
-  {
-    Logging.ingest.debug("Waiting for response stream");
-
-    try
-    {
-      // Stream.ready() always returns false for secure sockets :-(.  So
-      // we have to rely on socket timeouts to interrupt us if the server goes down.
-      String responseCode = readLine(in);
-
-      if (Logging.ingest.isDebugEnabled())
-        Logging.ingest.debug("Response code from ingest: '" + responseCode + "'");
-
-      // Read the response headers
-      String contentType = "text/plain; charset=iso-8859-1";
-      while (true)
-      {
-        String headerLine = readLine(in);
-        if (headerLine.length() == 0)
-          break;
-        // Look for the headers we care about, ignore the rest...
-        int spaceIndex = headerLine.indexOf(" ");
-        if (spaceIndex != -1)
-        {
-          String headerName = headerLine.substring(0,spaceIndex);
-          String headerValue = headerLine.substring(spaceIndex).trim().toLowerCase();
-          if (headerName.toLowerCase().equals("content-type:"))
-          {
-            contentType = headerValue;
-          }
-        }
-      }
-
-      // Now read the response data.  It's safe to assemble the data in memory.
-      int charsetIndex = contentType.indexOf("charset=");
-      String charsetName = "iso-8859-1";
-      if (charsetIndex != -1)
-        charsetName = contentType.substring(charsetIndex+8);
-
-      // NOTE: We may get back an unparseable pile of goo here, especially if the error is a 500 error.
-      // But we can't hand the binary to the XML parser and still be able to get at the raw data.  So we
-      // read the data into memory first (as binary), and then make a decision based on parseability as to whether
-      // we attempt to decode it.
-      byte[] responseContent = readInputStream(in);
-      
-      XMLDoc doc = null;
-      String rawString = null;
-      try
-      {
-        doc = new XMLDoc(new ByteArrayInputStream(responseContent));
-      }
-      catch (ManifoldCFException e)
-      {
-        // Syntax errors should be eaten; we'll just return a null doc in that case.
-        // But we do try to convert the raw data to string form.
-        try
-        {
-          rawString = new String(responseContent,charsetName);
-        }
-        catch (UnsupportedEncodingException e2)
-        {
-          // Uh oh, can't even convert it to a string.  Now we are desperate.
-          rawString = "Response had an illegal encoding: "+e2.getMessage();
-          e.printStackTrace();
-        }
-      }
-
-      Logging.ingest.debug("Read of response stream complete");
-      return new CodeDetails(responseCode,doc,rawString);
-    }
-    catch (java.net.SocketTimeoutException e)
-    {
-      // If this continues, we should indeed abort the job.  Retries should not go on indefinitely either; 2 hours is plenty
-      long currentTime = System.currentTimeMillis();
-      throw new ServiceInterruption("Ingestion API socket timeout exception waiting for response code: "+e.getMessage()+"; ingestion will be retried again later",
-        e,
-        currentTime + interruptionRetryTime,
-        currentTime + 2L * 60L * 60000L,
-        -1,
-        true);
-    }
-    catch (InterruptedIOException e)
-    {
-      throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
-    }
-    catch (java.net.ConnectException e)
-    {
-      // If this continues, we should indeed abort the job.  Retries should not go on indefinitely either; 2 hours is plenty
-      long currentTime = System.currentTimeMillis();
-      throw new ServiceInterruption("Timed out connecting to ingestion API: "+e.getMessage()+"; ingestion will be retried again later",
-        e,
-        currentTime + interruptionRetryTime,
-        currentTime + 2L * 60L * 60000L,
-        -1,
-        true);
-    }
-    catch (java.net.SocketException e)
-    {
-      // Return 400 error; likely a connection reset which lost us the response data, so
-      // just treat it as something OK.
-      return new CodeDetails("HTTP/1.0 400 Connection Reset",null,null);
-
-    }
-    catch (IOException ioe)
-    {
-      Logging.ingest.warn("IO exception trying to get response from ingestion API: "+ioe.getMessage(),ioe);
-      // If this continues, we should indeed abort the job.  Retries should not go on indefinitely either; 2 hours is plenty
-      long currentTime = System.currentTimeMillis();
-      throw new ServiceInterruption("IO exception waiting for response code: "+ioe.getMessage()+"; ingestion will be retried again later",
-        ioe,
-        currentTime + interruptionRetryTime,
-        currentTime + 2L * 60L * 60000L,
-        -1,
-        true);
-    }
-  }
-
-  /** Read input stream into an in-memory array */
-  protected static byte[] readInputStream(InputStream is)
-    throws IOException
-  {
-    // Create an array of byte arrays, and assemble the result into a final piece at the end.
-    List array = new ArrayList();
-    int count = 0;
-    while (true)
-    {
-      byte[] buffer = new byte[65536];
-      int amt = is.read(buffer);
-      if (amt == -1)
-        break;
-      count += amt;
-      array.add(buffer);
-    }
-    byte[] rval = new byte[count];
-    int pointer = 0;
-    int index = 0;
-    while (pointer < count)
-    {
-      byte[] buffer = (byte[])array.get(index++);
-      if (buffer.length > count-pointer)
-      {
-        System.arraycopy(buffer,0,rval,pointer,count-pointer);
-        pointer = count;
-      }
-      else
-      {
-        System.arraycopy(buffer,0,rval,pointer,buffer.length);
-        pointer += buffer.length;
-      }
-    }
-    return rval;
-  }
-  
-  /** Write credentials to output */
-  protected void writeCredentials(OutputStream out)
-    throws IOException
-  {
-    // Apply credentials if present
-    if (encodedCredentials != null)
-    {
-      Logging.ingest.debug("Applying credentials");
-      byte[] tmp = ("Authorization: Basic " + encodedCredentials + "\r\n").getBytes("UTF-8");
-      out.write(tmp, 0, tmp.length);
-
-      tmp = ("WWW-Authenticate: Basic realm=\"" + ((realm != null) ? realm : "") + "\"\r\n").getBytes("UTF-8");
-      out.write(tmp, 0, tmp.length);
-    }
-  }
-
-  /** Build a secure socket factory based on no keystore and a lax trust manager.
-  * This allows use of SSL for privacy but not identification. */
-  protected static javax.net.ssl.SSLSocketFactory getOpenSecureSocketFactory()
-    throws ManifoldCFException
-  {
-    try
-    {
-      java.security.SecureRandom secureRandom = java.security.SecureRandom.getInstance("SHA1PRNG");
-
-      // Create an SSL context
-      javax.net.ssl.SSLContext sslContext = javax.net.ssl.SSLContext.getInstance("SSL");
-      sslContext.init(null,new LaxTrustManager[]{new LaxTrustManager()},secureRandom);
-      return sslContext.getSocketFactory();
-    }
-    catch (java.security.NoSuchAlgorithmException e)
-    {
-      throw new ManifoldCFException("No such algorithm: "+e.getMessage(),e);
-    }
-    catch (java.security.KeyManagementException e)
-    {
-      throw new ManifoldCFException("Key management exception: "+e.getMessage(),e);
-    }
-  }
-
-  /** Create a socket in a manner consistent with all of our specified parameters.
-  */
-  protected Socket createSocket(long responseRetryCount)
-    throws IOException, ManifoldCFException
-  {
-    Socket socket;
-    if (protocol.equals("https") && socketFactory != null)
-    {
-      try
-      {
-        //SocketFactory factory = SSLSocketFactory.getDefault();
-        socket = socketFactory.createSocket(host,port);
-      }
-      catch (InterruptedIOException e)
-      {
-        throw e;
-      }
-      catch (IOException e)
-      {
-        throw new ManifoldCFException("Couldn't set up SSL connection to ingestion API: "+e.getMessage(),e);
-      }
-    }
-    else
-      socket = new Socket(host, port);
-
-    // Calculate the timeout we want
-    long timeoutMilliseconds = responseRetryWait * responseRetryCount;
-    socket.setSoTimeout((int)timeoutMilliseconds);
-
-    return socket;
-  }
-
-  /** Byte buffer class */
-  protected static class ByteBuffer
-  {
-    byte[] theBuffer;
-    int bufferAmt;
-
-    public ByteBuffer()
-    {
-      createBuffer(64);
-    }
-
-    protected void createBuffer(int size)
-    {
-      theBuffer = new byte[size];
-    }
-
-    public void append(byte b)
-    {
-      if (bufferAmt == theBuffer.length)
-      {
-        byte[] oldBuffer = theBuffer;
-        createBuffer(bufferAmt * 2);
-        int i = 0;
-        while (i < bufferAmt)
-        {
-          theBuffer[i] = oldBuffer[i];
-          i++;
-        }
-      }
-      theBuffer[bufferAmt++] = b;
-    }
-
-    public String toString(String encoding)
-      throws java.io.UnsupportedEncodingException
-    {
-      return new String(theBuffer,0,bufferAmt,encoding);
-    }
-
-  }
-
-  /** Our own trust manager, which ignores certificate issues */
-  protected static class LaxTrustManager implements X509TrustManager
-  {
-    /** Does nothing */
-    public LaxTrustManager()
-    {
-    }
-
-    /** Return a list of accepted issuers.  There are none. */
-    public java.security.cert.X509Certificate[] getAcceptedIssuers()
-    {
-      return new java.security.cert.X509Certificate[0];
-    }
-
-    /** We have no problem with any clients */
-    public void checkClientTrusted(java.security.cert.X509Certificate[] certs, String authType)
-      throws java.security.cert.CertificateException
-    {
-    }
-
-    /** We have no problem with any servers */
-    public void checkServerTrusted(java.security.cert.X509Certificate[] certs, String authType)
-      throws java.security.cert.CertificateException
-    {
-    }
-
-  }
-
-  /** Calculate the length of the preamble */
-  protected static int lengthPreamble()
-    throws IOException
-  {
-    return preambleBytes.length;
-  }
-
-  /** Calculate the length of a boundary */
-  protected static int lengthBoundary(String contentType, String name, String fileName)
-    throws IOException
-  {
-    int rval = 0;
-    rval += separatorBytes.length;
-    String value = "Content-Disposition: form-data";
-    if (name != null)
-      value += "; name=\""+name+"\"";
-    if (fileName != null)
-      value += "; filename=\""+fileName+"\"";
-    value += "\r\n";
-    byte[] tmp = value.getBytes("UTF-8");
-    rval += tmp.length;
-    tmp = ("Content-Type: "+contentType+"\r\n\r\n").getBytes("ASCII");
-    rval += tmp.length;
-    return rval;
-  }
-
-  /** Calculate the length of the postamble */
-  protected static int lengthPostamble()
-    throws IOException
-  {
-    return postambleBytes.length;
-  }
-
-  /** Calculate the length of a field */
-  protected static int lengthField(String fieldName, String fieldValue)
-    throws IOException
-  {
-    int rval = lengthPreamble() + lengthBoundary("text/plain; charset=UTF-8",fieldName,null);
-    byte[] tmp = fieldValue.getBytes("UTF-8");
-    rval += tmp.length;
-    rval += lengthPostamble();
-    return rval;
-  }
-
-  /** Count the size of an acl level */
-  protected int lengthACLs(String aclType, String[] acl, String[] denyAcl)
-    throws IOException
-  {
-    int totalLength = 0;
-    String metadataACLName = LITERAL + allowAttributeName + aclType;
-    int i = 0;
-    while (i < acl.length)
-    {
-      totalLength += lengthField(metadataACLName,acl[i++]);
-    }
-    String metadataDenyACLName = LITERAL + denyAttributeName + aclType;
-    i = 0;
-    while (i < denyAcl.length)
-    {
-      totalLength += lengthField(metadataDenyACLName,denyAcl[i++]);
-    }
-    return totalLength;
-  }
-
-  /** Write the preamble */
-  protected static void writePreamble(OutputStream out)
-    throws IOException
-  {
-    out.write(preambleBytes, 0, preambleBytes.length);
-  }
-
-  /** Write a boundary */
-  protected static void writeBoundary(OutputStream out, String contentType, String name, String fileName)
-    throws IOException
-  {
-    out.write(separatorBytes, 0, separatorBytes.length);
-    String value = "Content-Disposition: form-data";
-    if (name != null)
-      value += "; name=\""+name+"\"";
-    if (fileName != null)
-      value += "; filename=\""+fileName+"\"";
-    value += "\r\n";
-    byte[] tmp = value.getBytes("UTF-8");
-    out.write(tmp, 0, tmp.length);
-    tmp = ("Content-Type: "+contentType+"\r\n\r\n").getBytes("ASCII");
-    out.write(tmp, 0, tmp.length);
-  }
-
-  /** Write the postamble */
-  protected static void writePostamble(OutputStream out)
-    throws IOException
-  {
-    out.write(postambleBytes, 0, postambleBytes.length);
-  }
-
   /** Write a field */
-  protected static void writeField(OutputStream out, String fieldName, String fieldValue)
-    throws IOException
+  protected static void writeField(ContentStreamUpdateRequest out, String fieldName, String fieldValue)
   {
-    writePreamble(out);
-    writeBoundary(out,"text/plain; charset=UTF-8",fieldName,null);
-    byte[] tmp = fieldValue.getBytes("UTF-8");
-    out.write(tmp, 0, tmp.length);
-    writePostamble(out);
+    out.setParam(fieldName,fieldValue);
   }
 
-  
   /** Output an acl level */
-  protected void writeACLs(OutputStream out, String aclType, String[] acl, String[] denyAcl)
-    throws IOException
+  protected void writeACLs(ContentStreamUpdateRequest out, String aclType, String[] acl, String[] denyAcl)
   {
     String metadataACLName = LITERAL + allowAttributeName + aclType;
-    int i = 0;
-    while (i < acl.length)
+    for (int i = 0; i < acl.length; i++)
     {
-      writeField(out,metadataACLName,acl[i++]);
+      writeField(out,metadataACLName,acl[i]);
     }
     String metadataDenyACLName = LITERAL + denyAttributeName + aclType;
-    i = 0;
-    while (i < denyAcl.length)
+    for (int i = 0; i < denyAcl.length; i++)
     {
-      writeField(out,metadataDenyACLName,denyAcl[i++]);
+      writeField(out,metadataDenyACLName,denyAcl[i]);
     }
   }
   
-  /** XML encoding */
-  protected static String xmlEncode(String input)
-  {
-    StringBuilder sb = new StringBuilder("<![CDATA[");
-    sb.append(input);
-    sb.append("]]>");
-    return sb.toString();
-  }
-  
   /** Killable thread that does ingestions.
   * Java 1.5 stopped permitting thread interruptions to abort socket waits.  As a result, it is impossible to get threads to shutdown cleanly that are doing
   * such waits.  So, the places where this happens are segregated in their own threads so that they can be just abandoned.
@@ -1024,8 +656,8 @@ public class HttpPoster
   {
     protected String documentURI;
     protected RepositoryDocument document;
-    protected Map arguments;
-    protected Map sourceTargets;
+    protected Map<String,List<String>> arguments;
+    protected Map<String,String> sourceTargets;
     protected String[] shareAcls;
     protected String[] shareDenyAcls;
     protected String[] acls;
@@ -1040,7 +672,8 @@ public class HttpPoster
     protected boolean readFromDocumentStreamYet = false;
     protected boolean rval = false;
 
-    public IngestThread(String documentURI, RepositoryDocument document, Map arguments, Map sourceTargets,
+    public IngestThread(String documentURI, RepositoryDocument document,
+      Map<String,List<String>> arguments, Map<String,String> sourceTargets,
       String[] shareAcls, String[] shareDenyAcls, String[] acls, String[] denyAcls, String commitWithin)
     {
       super();
@@ -1069,366 +702,99 @@ public class HttpPoster
         // Open a socket to ingest, and to the response stream to get the post result
         try
         {
-          // Set up the socket, and the (optional) secure socket.
-          long responseRetryCount = responseRetries + (long)((float)length * sizeCoefficient);
-          Socket socket = createSocket(responseRetryCount);
-          try
+          ContentStreamUpdateRequest contentStreamUpdateRequest = new ContentStreamUpdateRequest(postUpdateAction);
+          
+          // Write the id field
+          writeField(contentStreamUpdateRequest,LITERAL+idAttributeName,documentURI);
+
+          // Write the access token information
+          writeACLs(contentStreamUpdateRequest,"share",shareAcls,shareDenyAcls);
+          writeACLs(contentStreamUpdateRequest,"document",acls,denyAcls);
+
+          // Write the arguments
+          for (String name : arguments.keySet())
           {
-            InputStream in = socket.getInputStream();
-            try
+            List<String> values = arguments.get(name);
+            for (String value : values)
             {
-              OutputStream out = socket.getOutputStream();
-              try
-              {
-                // Create the output stream to SOLR
-                byte[] tmp = ("POST " + postUpdateAction + " HTTP/1.0\r\n").getBytes("ASCII");
-                out.write(tmp, 0, tmp.length);
-
-                // Set all the headers
-                writeCredentials(out);
-
-                // Headers must include the following:
-                // Content-Type
-                // Content-Length
-                // The content-length is calculated using the entire body length, which therefore includes the length of all the metadata fields as well.
-
-                // Come up with a boundary.  Ideally, the boundary should be something that doesn't exist in any of the data.  In practice, that would mean
-                // scanning all such data at least twice: once to make sure we avoided all boundary collisions, and a second time to actually output the data.
-                // This is such a huge chunk of overhead, I've decided for now to just punt and pick something that's pretty unlikely.
-
-
-                // Calculate the content length.  To do this, we have to walk through the entire multipart assembly process, but calculate the length rather than output
-                // anything.
-
-                int totalLength = 0;
-                // Count the id.
-                totalLength += lengthField(LITERAL+idAttributeName,documentURI);
-                // Count the acls
-                totalLength += lengthACLs("share",shareAcls,shareDenyAcls);
-                totalLength += lengthACLs("document",acls,denyAcls);
-                // Count the arguments
-                Iterator iter = arguments.keySet().iterator();
-                while (iter.hasNext())
-                {
-                  String name = (String)iter.next();
-                  List values = (List)arguments.get(name);
-                  int j = 0;
-                  while (j < values.size())
-                  {
-                    String value = (String)values.get(j++);
-                    totalLength += lengthField(name,value);
-                  }
-                }
-                // Count the metadata.
-                iter = document.getFields();
-                while (iter.hasNext())
-                {
-                  String fieldName = (String)iter.next();
-                  String newFieldName = (String)sourceTargets.get(fieldName);
-                  if (newFieldName == null)
-                    newFieldName = fieldName;
-		  // Make SURE we can't double up on the id field inadvertantly!
-		  if (newFieldName.length() > 0)
-		  {
-		    if (newFieldName.toLowerCase().equals(idAttributeName.toLowerCase()))
-		      newFieldName = ID_METADATA;
-                    String[] values = document.getFieldAsStrings(fieldName);
-                    // We only handle strings right now!!!
-                    int k = 0;
-                    while (k < values.length)
-                    {
-                      String value = values[k++];
-                      totalLength += lengthField(LITERAL+newFieldName,value);
-                    }
-                  }
-                }
-                // Count the commitWithin parameter
-                if (commitWithin != null)
-                  totalLength += lengthField(COMMITWITHIN_METADATA,commitWithin);
-                // Count the binary data
-                totalLength += lengthPreamble();
-                totalLength += lengthBoundary("application/octet-stream","myfile",document.getFileName());
-                totalLength += length;
-                // Count the postamble
-                totalLength += lengthPostamble();
-                // Count the end marker.
-                totalLength += endBytes.length;
-
-                // Now, output the content-length header, and another newline, to start the data.
-                tmp = ("Content-Length: "+Integer.toString(totalLength)+"\r\n").getBytes("ASCII");
-                out.write(tmp, 0, tmp.length);
-
-                tmp = ("Content-Type: multipart/form-data; boundary=").getBytes("ASCII");
-                out.write(tmp, 0, tmp.length);
-                out.write(separatorBytes, 0, separatorBytes.length);
-
-                // End of headers.
-                tmp = "\r\n".getBytes("ASCII");
-                out.write(tmp, 0, tmp.length);
-
-                // Write the id field
-                writeField(out,LITERAL+idAttributeName,documentURI);
-
-		// Write the access token information
-                writeACLs(out,"share",shareAcls,shareDenyAcls);
-                writeACLs(out,"document",acls,denyAcls);
-
-                // Write the arguments
-                iter = arguments.keySet().iterator();
-                while (iter.hasNext())
-                {
-                  String name = (String)iter.next();
-                  List values = (List)arguments.get(name);
-                  int j = 0;
-                  while (j < values.size())
-                  {
-                    String value = (String)values.get(j++);
-                    writeField(out,name,value);
-                  }
-                }
-
-                // Write the metadata, each in a field by itself
-                iter = document.getFields();
-                while (iter.hasNext())
-                {
-                  String fieldName = (String)iter.next();
-                  String newFieldName = (String)sourceTargets.get(fieldName);
-                  if (newFieldName == null)
-                    newFieldName = fieldName;
-		  if (newFieldName.length() > 0)
-		  {
-		    if (newFieldName.toLowerCase().equals(idAttributeName.toLowerCase()))
-		      newFieldName = ID_METADATA;
-                    String[] values = document.getFieldAsStrings(fieldName);
-                    // We only handle strings right now!!!
-                    int k = 0;
-                    while (k < values.length)
-                    {
-                      String value = values[k++];
-                      writeField(out,LITERAL+newFieldName,value);
-                    }
-                  }
-                }
-                
-                // Write the commitWithin parameter
-                if (commitWithin != null)
-                  writeField(out,COMMITWITHIN_METADATA,commitWithin);
-
-                // Write the content
-                writePreamble(out);
-
-                writeBoundary(out,"application/octet-stream","myfile",document.getFileName());
-
-                // Stream the data
-                long total = 0;
-                long now, later;
-                now = System.currentTimeMillis();
-
-                byte[] bytes = new byte[buffersize];
-
-                // Write out the contents of the inputstream to the socket
-                while (true)
-                {
-                  int count;
-                  // Specially catch all errors that come from reading the input stream itself.
-                  // This will help us segregate errors that come from the stream vs. those that come from the ingestion system.
-                  try
-                  {
-                    count = is.read(bytes);
-                  }
-                  catch (java.net.SocketTimeoutException ioe)
-                  {
-                    // We have to catch socket timeout exceptions specially, because they are derived from InterruptedIOException
-                    // They are otherwise just like IOExceptions
-
-                    // Log the error
-                    Logging.ingest.warn("Error reading data for transmission to Ingestion API: "+ioe.getMessage(),ioe);
-
-                    activityStart = new Long(fullStartTime);
-                    activityCode = "-1";
-                    activityDetails = "Couldn't read document: "+ioe.getMessage();
-
-                    // If this continues, we should indeed abort the job.  Retries should not go on indefinitely either; 2 hours is plenty
-                    long currentTime = System.currentTimeMillis();
-                    throw new ServiceInterruption("IO error reading document for ingestion: "+ioe.getMessage()+"; read will be retried again later",
-                      ioe,
-                      currentTime + interruptionRetryTime,
-                      currentTime + 2L * 60L * 60000L,
-                      -1,
-                      true);
-
-                  }
-                  catch (InterruptedIOException ioe)
-                  {
-                    // If the transfer was interrupted, it may be because we are shutting down the thread.
-
-                    // Third-party library exceptions derived from InterruptedIOException are possible; if the stream comes from httpclient especially.
-                    // If we see one of these, we treat it as "not an interruption".
-                    if (!ioe.getClass().getName().equals("java.io.InterruptedIOException"))
-                    {
-                      // Log the error
-                      Logging.ingest.warn("Error reading data for transmission to Ingestion API: "+ioe.getMessage(),ioe);
-
-                      activityStart = new Long(fullStartTime);
-                      activityCode = "-1";
-                      activityDetails = "Couldn't read document: "+ioe.getMessage();
-
-                      // If this continues, we should indeed abort the job.  Retries should not go on indefinitely either; 2 hours is plenty
-                      long currentTime = System.currentTimeMillis();
-                      throw new ServiceInterruption("IO error reading document for ingestion: "+ioe.getMessage()+"; read will be retried again later",
-                        ioe,
-                        currentTime + interruptionRetryTime,
-                        currentTime + 2L * 60L * 60000L,
-                        -1,
-                        true);
-                    }
-                    else
-                      throw ioe;
-                  }
-                  catch (IOException ioe)
-                  {
-                    // We need to decide whether to throw a service interruption or lcf exception, based on what went wrong.
-                    // We never retry here; the cause is the repository, so there's not any point.
-
-                    // Log the error
-                    Logging.ingest.warn("Error reading data for transmission to Ingestion API: "+ioe.getMessage(),ioe);
-
-                    activityStart = new Long(fullStartTime);
-                    activityCode = "-1";
-                    activityDetails = "Couldn't read document: "+ioe.getMessage();
-
-                    // If this continues, we should indeed abort the job.  Retries should not go on indefinitely either; 2 hours is plenty
-                    long currentTime = System.currentTimeMillis();
-                    throw new ServiceInterruption("IO error reading document for ingestion: "+ioe.getMessage()+"; read will be retried again later",
-                      ioe,
-                      currentTime + interruptionRetryTime,
-                      currentTime + 2L * 60L * 60000L,
-                      -1,
-                      true);
-                  }
-
-                  if (count == -1)
-                    break;
-                  readFromDocumentStreamYet = true;
-                  out.write(bytes,0,count);
-                  total += (long)count;
-                }
-
-                // Write the postamble
-                writePostamble(out);
-
-                // Write the end marker
-                out.write(endBytes, 0, endBytes.length);
-
-                out.flush();
-
-                later = System.currentTimeMillis();
-                if (Logging.ingest.isDebugEnabled())
-                  Logging.ingest.debug("Total bytes posted: " + new Long(total).toString() + ", total time: " + (later - now));
-
-                // Now, process response
-                CodeDetails cd;
-                try
-                {
-                  cd = getResponse(in);
-                }
-                catch (ServiceInterruption si)
-                {
-                  activityStart = new Long(now);
-                  activityCode = "-2";
-                  activityDetails = si.getMessage();
-                  throw si;
-                }
-
-
-                activityStart = new Long(now);
-                activityBytes = new Long(length);
-                activityCode = cd.getCode();
-                activityDetails = cd.getDetails();
-
-                int codeValue = cd.getCodeValue();
-
-                // A negative number means http error of some kind.
-                if (codeValue < 0)
-                  throw new ManifoldCFException("Http protocol error");
-
-                // 200 means we got a status document back
-                if (codeValue == 200)
-                {
-                  // Look at response XML
-                  cd.parseIngestionResponse();
-                  rval = true;
-                  return;
-                }
-
-                // Anything else means the document didn't ingest.
-                // There are three possibilities here:
-                // 1) The document will NEVER ingest (it's illegal), in which case a 400 or 403 will be returned, and
-                // 2) There is a transient error, in which case we will want to try again, after a wait.
-                //    If the situation is (2), then we CAN'T retry if we already read any of the stream; therefore
-                //    we are forced to throw a "service interrupted" exception, and let the caller reschedule
-                //    the ingestion.
-                // 3) Something is wrong with the setup, e.g. bad credentials.  In this case we chuck a ManifoldCFException,
-                //    since this will abort the current activity entirely.
-
-                if (codeValue == 401)
-                  throw new ManifoldCFException("Bad credentials for ingestion",ManifoldCFException.SETUP_ERROR);
-
-                if ((codeValue >= 400 && codeValue < 500) ||
-                  (codeValue == 500 && cd.getDetails() != null && cd.getDetails().indexOf("org.apache.tika.exception.TikaException") != -1))
-                {
-                  rval = false;
-                  return;
-                }
-
-                // If this continues, we should indeed abort the job.  Retries should not go on indefinitely either; 2 hours is plenty
-                long currentTime = System.currentTimeMillis();
-                throw new ServiceInterruption("Error "+Integer.toString(codeValue)+" from ingestion request; ingestion will be retried again later",
-                  new ManifoldCFException("Ingestion HTTP error code "+Integer.toString(codeValue)),
-                  currentTime + interruptionRetryTime,
-                  currentTime + 2L * 60L * 60000L,
-                  -1,
-                  true);
-              }
-              finally
+              writeField(contentStreamUpdateRequest,name,value);
+            }
+          }
+
+          // Write the metadata, each in a field by itself
+          Iterator<String> iter = document.getFields();
+          while (iter.hasNext())
+          {
+            String fieldName = iter.next();
+            String newFieldName = sourceTargets.get(fieldName);
+            if (newFieldName == null)
+              newFieldName = fieldName;
+            if (newFieldName.length() > 0)
+            {
+              if (newFieldName.toLowerCase().equals(idAttributeName.toLowerCase()))
+                newFieldName = ID_METADATA;
+              String[] values = document.getFieldAsStrings(fieldName);
+              // We only handle strings right now!!!
+              for (String value : values)
               {
-                out.close();
+                writeField(contentStreamUpdateRequest,LITERAL+newFieldName,value);
               }
             }
-            finally
-            {
-              in.close();
-            }
           }
-          finally
+                
+          // Write the commitWithin parameter
+          if (commitWithin != null)
+            writeField(contentStreamUpdateRequest,COMMITWITHIN_METADATA,commitWithin);
+
+          contentStreamUpdateRequest.addContentStream(new RepositoryDocumentStream(is,length));
+
+          // Fire off the request.
+          // Note: I need to know whether the document has been permanently rejected or not, but we currently have
+          // no means to determine that.  Analysis of SolrServerExceptions that have been thrown is likely needed.
+          try
           {
-            try
-            {
-              socket.close();
-            }
-            catch (InterruptedIOException e)
-            {
-              throw e;
-            }
-            catch (IOException e)
-            {
-              Logging.ingest.debug("Error closing socket: "+e.getMessage(),e);
-              // Do NOT rethrow
-            }
+            readFromDocumentStreamYet = true;
+            solrServer.request(contentStreamUpdateRequest);
+            
+            // Successful completion
+            activityStart = new Long(fullStartTime);
+            activityBytes = new Long(length);
+            activityCode = "OK";
+            activityDetails = null;
+
+            rval = true;
+            return;
+          }
+          catch (SolrServerException e)
+          {
+            // Log what happened to us
+            activityStart = new Long(fullStartTime);
+            activityBytes = new Long(length);
+            activityCode = "FAILED";
+            activityDetails = e.getMessage();
+            
+            // Use the exception text to determine the proper result.
+            if (e.getMessage().indexOf("org.apache.tika.exception.TikaException") != -1)
+            {
+              // Can't process the document, so don't keep trying.
+              rval = false;
+              return;
+            }
+
+            // Otherwise, abort the job.  This is not the right thing to do probably, but we
+            // need to know SolrJ's exception cases before we can get this logic right.
+            // MHL
+            throw e;
           }
-        }
-        catch (UnsupportedEncodingException ioe)
-        {
-          throw new ManifoldCFException("Fatal ingestion error: "+ioe.getMessage(),ioe);
         }
         catch (java.net.SocketTimeoutException ioe)
         {
           // These are just like IO errors, but since they are derived from InterruptedIOException, they have to be caught first.
           // Log the error
-          Logging.ingest.warn("Error connecting to ingestion API: "+ioe.getMessage(),ioe);
+          Logging.ingest.warn("Error indexing into Solr: "+ioe.getMessage(),ioe);
 
           activityStart = new Long(fullStartTime);
-          activityCode = "-1";
+          activityCode = "SOCKET TIMEOUT";
           activityDetails = ioe.getMessage();
 
           throw ioe;
@@ -1451,18 +817,18 @@ public class HttpPoster
             // since we really don't know what happened for sure.
             // Record the attempt
 
-            activityCode = "-103";
-            activityDetails = "Presuming an ingestion rejection: "+ioe.getMessage();
+            activityCode = "SOCKET CLOSE";
+            activityDetails = "Presuming an indexing rejection: "+ioe.getMessage();
             rval = false;
             return;
           }
 
           // Record the attempt
-          activityCode = "-1";
+          activityCode = "IO ERROR";
           activityDetails = ioe.getMessage();
 
           // Log the error
-          Logging.ingest.warn("Error communicating with Ingestion API: "+ioe.getMessage(),ioe);
+          Logging.ingest.warn("Error indexing into Solr: "+ioe.getMessage(),ioe);
 
           throw ioe;
         }
@@ -1540,117 +906,33 @@ public class HttpPoster
         // Open a socket to ingest, and to the response stream to get the post result
         try
         {
-          // Set up the socket, and the (optional) secure socket.
-          Socket socket = createSocket(responseRetries);
-          try
-          {
-            InputStream in = socket.getInputStream();
-            try
-            {
-              OutputStream out = socket.getOutputStream();
-              try
-              {
-                byte[] requestBytes = ("<delete><id>"+xmlEncode(documentURI)+"</id></delete>").getBytes("UTF-8");
-                long startTime = System.currentTimeMillis();
-                byte[] tmp = ("POST " + postRemoveAction + " HTTP/1.0\r\n").getBytes("ASCII");
-                out.write(tmp, 0, tmp.length);
-
-                // Set all the headers
-                writeCredentials(out);
-                tmp = ("Content-Length: "+Integer.toString(requestBytes.length)+"\r\n").getBytes("ASCII");
-                out.write(tmp, 0, tmp.length);
-                tmp = ("Content-Type: text/xml; charset=UTF-8\r\n\r\n").getBytes("ASCII");
-                out.write(tmp, 0, tmp.length);
-
-                out.write(requestBytes);
-
-                out.flush();
-
-                if (Logging.ingest.isDebugEnabled())
-                  Logging.ingest.debug("Delete posted");
-
-                CodeDetails cd;
-                try
-                {
-                  cd = getResponse(in);
-                }
-                catch (ServiceInterruption si)
-                {
-                  activityStart = new Long(startTime);
-                  activityCode = "-2";
-                  activityDetails = si.getMessage();
-                  throw si;
-                }
-
-                activityStart = new Long(startTime);
-                activityCode = cd.getCode();
-                activityDetails = cd.getDetails();
-
-                int codeValue = cd.getCodeValue();
-
-                if (codeValue < 0)
-                  throw new ManifoldCFException("Http protocol error");
-
-                // 200 means we got an xml document back
-                if (codeValue == 200)
-                {
-                  // Look at response XML
-                  cd.parseRemovalResponse();
-                  return;
-                }
-
-                // We ignore everything in the range from 400-500 now
-                if (codeValue == 401)
-                  throw new ManifoldCFException("Bad credentials for ingestion",ManifoldCFException.SETUP_ERROR);
-
-                if (codeValue >= 400 && codeValue < 500)
-                  return;
-
-                // Anything else means the document didn't delete.  Throw the error.
-                throw new ManifoldCFException("Error deleting document: '"+cd.getDescription()+"'");
-              }
-              finally
-              {
-                out.close();
-              }
-            }
-            finally
-            {
-              in.close();
-            }
-          }
-          finally
-          {
-            try
-            {
-              socket.close();
-            }
-            catch (InterruptedIOException e)
-            {
-              throw e;
-            }
-            catch (IOException e)
-            {
-              Logging.ingest.debug("Error closing socket: "+e.getMessage(),e);
-              // Do NOT rethrow
-            }
-          }
-        }
-        catch (UnsupportedEncodingException ioe)
-        {
-          throw new ManifoldCFException("Fatal ingestion error: "+ioe.getMessage(),ioe);
+          solrServer.deleteById(documentURI);
+            
+          // Success
+          activityStart = new Long(fullStartTime);
+          activityCode = "OK";
+          activityDetails = null;
+          return;
         }
         catch (InterruptedIOException ioe)
         {
           return;
         }
+        catch (SolrServerException e)
+        {
+          activityStart = new Long(fullStartTime);
+          activityCode = "FAILED";
+          activityDetails = e.getMessage();
+
+          throw e;
+        }
         catch (IOException ioe)
         {
           // Log the error
-          Logging.ingest.warn("Error communicating with Ingestion API: "+ioe.getMessage(),ioe);
+          Logging.ingest.warn("Error deleting document: "+ioe.getMessage(),ioe);
 
           activityStart = new Long(fullStartTime);
-          activityCode = "-1";
+          activityCode = "IO ERROR";
           activityDetails = ioe.getMessage();
 
           throw ioe;
@@ -1703,94 +985,19 @@ public class HttpPoster
     {
       try
       {
-        // Do the operation!
-        // Open a socket to update request handler, and to the response stream to get the post result
         try
         {
-          // Set up the socket, and the (optional) secure socket.
-          Socket socket = createSocket(responseRetries);
-          try
-          {
-            InputStream in = socket.getInputStream();
-            try
-            {
-              OutputStream out = socket.getOutputStream();
-              try
-              {
-                // Create the output stream to GTS
-                byte[] tmp = ("GET " + postUpdateAction + "?commit=true HTTP/1.0\r\n").getBytes("ASCII");
-                out.write(tmp, 0, tmp.length);
-
-                writeCredentials(out);
-
-                tmp = ("Content-Length: 0\r\n\r\n").getBytes("ASCII");
-                out.write(tmp, 0, tmp.length);
-
-                if (Logging.ingest.isDebugEnabled())
-                  Logging.ingest.debug("Commit request posted");
-
-                out.flush();
-
-                CodeDetails cd = getResponse(in);
-
-                int codeValue = cd.getCodeValue();
-                if (codeValue < 0)
-                  throw new ManifoldCFException("Http protocol error");
-
-                // 200 means everything went OK
-                if (codeValue == 200)
-                {
-                  cd.parseCommitResponse();
-                  return;
-                }
-
-                // We ignore everything in the range from 400-500 now
-                if (codeValue == 401)
-                  throw new ManifoldCFException("Bad credentials for commit request",ManifoldCFException.SETUP_ERROR);
-
-                // Anything else means the info request failed.
-                throw new ManifoldCFException("Error connecting to update request API: '"+cd.getDescription()+"'");
-              }
-              finally
-              {
-                out.close();
-              }
-            }
-            finally
-            {
-              in.close();
-            }
-          }
-          finally
-          {
-            try
-            {
-              socket.close();
-            }
-            catch (InterruptedIOException e)
-            {
-              throw e;
-            }
-            catch (IOException e)
-            {
-              Logging.ingest.debug("Error closing socket: "+e.getMessage(),e);
-              // Do NOT rethrow
-            }
-          }
-        }
-        catch (UnsupportedEncodingException ioe)
-        {
-          throw new ManifoldCFException("Fatal commit error: "+ioe.getMessage(),ioe);
+          // Do the operation!
+          solrServer.commit();
         }
         catch (InterruptedIOException ioe)
         {
-          // Exit the thread.
           return;
         }
         catch (IOException ioe)
         {
           // Log the error
-          Logging.ingest.warn("Error communicating with update request handler: "+ioe.getMessage(),ioe);
+          Logging.ingest.warn("Error committing: "+ioe.getMessage(),ioe);
           throw ioe;
         }
       }
@@ -1828,83 +1035,11 @@ public class HttpPoster
       try
       {
         // Do the operation!
-        // Open a socket to ingest, and to the response stream to get the post result
         try
         {
-          // Set up the socket, and the (optional) secure socket.
-          Socket socket = createSocket(responseRetries);
-          try
-          {
-            InputStream in = socket.getInputStream();
-            try
-            {
-              OutputStream out = socket.getOutputStream();
-              try
-              {
-                // Create the output stream to GTS
-                byte[] tmp = ("GET " + postStatusAction + " HTTP/1.0\r\n").getBytes("ASCII");
-                out.write(tmp, 0, tmp.length);
-
-                writeCredentials(out);
-
-                tmp = ("Content-Length: 0\r\n\r\n").getBytes("ASCII");
-                out.write(tmp, 0, tmp.length);
-
-                if (Logging.ingest.isDebugEnabled())
-                  Logging.ingest.debug("Status request posted");
-
-                out.flush();
-
-                CodeDetails cd = getResponse(in);
-
-                int codeValue = cd.getCodeValue();
-                if (codeValue < 0)
-                  throw new ManifoldCFException("Http protocol error");
-
-                // 200 means everything went OK
-                if (codeValue == 200)
-                {
-                  cd.parseStatusResponse();
-                  return;
-                }
-
-                // We ignore everything in the range from 400-500 now
-                if (codeValue == 401)
-                  throw new ManifoldCFException("Bad credentials for ingestion",ManifoldCFException.SETUP_ERROR);
-
-                // Anything else means the info request failed.
-                throw new ManifoldCFException("Error connecting to ingestion API: '"+cd.getDescription()+"'");
-              }
-              finally
-              {
-                out.close();
-              }
-            }
-            finally
-            {
-              in.close();
-            }
-          }
-          finally
-          {
-            try
-            {
-              socket.close();
-            }
-            catch (InterruptedIOException e)
-            {
-              throw e;
-            }
-            catch (IOException e)
-            {
-              Logging.ingest.debug("Error closing socket: "+e.getMessage(),e);
-              // Do NOT rethrow
-            }
-          }
-        }
-        catch (UnsupportedEncodingException ioe)
-        {
-          throw new ManifoldCFException("Fatal ingestion error: "+ioe.getMessage(),ioe);
+          // MHL to check status via SolrJ
+          if (false)
+            throw new IOException("blah");
         }
         catch (InterruptedIOException ioe)
         {
@@ -1914,7 +1049,7 @@ public class HttpPoster
         catch (IOException ioe)
         {
           // Log the error
-          Logging.ingest.warn("Error communicating with Ingestion API: "+ioe.getMessage(),ioe);
+          Logging.ingest.warn("Error checking status: "+ioe.getMessage(),ioe);
           throw ioe;
         }
       }
@@ -1930,184 +1065,41 @@ public class HttpPoster
     }
   }
 
-  /** Code+details paper object */
-  protected static class CodeDetails
+  /** Class for importing documents into Solr via SolrJ
+  */
+  protected static class RepositoryDocumentStream extends ContentStreamBase
   {
-    protected String code;
-    protected int codeValue;
-    protected String details;
-    protected String res;
-    protected XMLDoc returnDoc;
-    protected String rawString;
-
-    public CodeDetails(String res, XMLDoc returnDoc, String rawString)
-    {
-      this.res = res;
-      this.returnDoc = returnDoc;
-      this.rawString = rawString;
-      codeValue = -100;
-      code = "-100";
-      details = "Http response was improperly formed";
-
-      int firstSpace = res.indexOf(" ");
-      if (firstSpace != -1)
-      {
-        int secondSpace = res.indexOf(" ", firstSpace + 1);
-        if (secondSpace != -1)
-        {
-          code = res.substring(firstSpace + 1, secondSpace);
-          details = res.substring(secondSpace+1).trim();
-          try
-          {
-            codeValue = (int)(new Double(code).doubleValue());
-            if (codeValue == 200)
-              details = null;
-          }
-          catch (NumberFormatException e)
-          {
-            // Fall through and leave codeValue unaltered
-          }
-        }
-      }
-    }
-
-    public String getCode()
-    {
-      return code;
-    }
-
-    public int getCodeValue()
-    {
-      return codeValue;
-    }
-
-    public String getDetails()
-    {
-      return details;
-    }
-
-    public XMLDoc getReturnDoc()
-    {
-      return returnDoc;
-    }
-
-    public String getDescription()
-      throws ManifoldCFException
+    protected InputStream is;
+    protected long length;
+    
+    public RepositoryDocumentStream(InputStream is, long length)
     {
-      return res + "\r\n" + ((returnDoc!=null)?returnDoc.getXML():((rawString!=null)?rawString:""));
+      this.is = is;
+      this.length = length;
     }
-
-    public void parseIngestionResponse()
-      throws ManifoldCFException
+    
+    @Override
+    public InputStream getStream() throws IOException
     {
-      // Look at response XML
-      String statusValue = "unknown";
-      XMLDoc doc = getReturnDoc();
-      if (doc != null)
-      {
-        if (Logging.ingest.isDebugEnabled())
-          Logging.ingest.debug("SOLR: Saw ingestion response document '"+doc.getXML()+"'");
-        //Object root = doc.getRoot();
-        ArrayList list = doc.processPath("*",null);
-        int k = 0;
-        while (k < list.size())
-        {
-          Object listNode = list.get(k++);
-          if (doc.getNodeName(listNode).equals("response"))
-          {
-            ArrayList list2 = doc.processPath("*",listNode);
-            int q = 0;
-            while (q < list2.size())
-            {
-              Object respNode = list2.get(q++);
-              if (doc.getNodeName(respNode).equals("lst"))
-              {
-                String lstName = doc.getValue(respNode,"name");
-                if (lstName.equals("responseHeader"))
-                {
-                  ArrayList list3 = doc.processPath("*",respNode);
-                  int z = 0;
-                  while (z < list3.size())
-                  {
-                    Object headerNode = list3.get(z++);
-                    if (doc.getNodeName(headerNode).equals("int"))
-                    {
-                      String value = doc.getValue(headerNode,"name");
-                      if (value.equals("status"))
-                      {
-                        statusValue = doc.getData(headerNode).trim();
-                      }
-                    }
-                  }
-                }
-              }
-            }
-          }
-        }
-        if (statusValue.equals("0"))
-          return;
-
-        throw new ManifoldCFException("Ingestion returned error: "+statusValue);
-      }
-      else
-        throw new ManifoldCFException("XML parsing error on response");
+      return is;
     }
-
-    public void parseRemovalResponse()
-      throws ManifoldCFException
+    
+    @Override
+    public Reader getReader() throws IOException
     {
-      parseIngestionResponse();
+      // MHL - I think this should always return null, but we need to confirm
+      //String charset = getCharsetFromContentType( this.getContentType() );
+      //return charset == null ? new InputStreamReader( this.getStream() ) : new InputStreamReader( this.getStream(), charset );
+      return null;
     }
 
-    public void parseCommitResponse()
-      throws ManifoldCFException
+    @Override
+    public String getContentType()
     {
-      parseIngestionResponse();
+      return "application/octet-stream";
     }
-    
-    public void parseStatusResponse()
-      throws ManifoldCFException
-    {
-      // Look at response XML
-      String statusValue = "unknown";
-      XMLDoc doc = getReturnDoc();
-      if (doc != null)
-      {
-        if (Logging.ingest.isDebugEnabled())
-          Logging.ingest.debug("SOLR: Saw status response document '"+doc.getXML()+"'");
-        //Object root = doc.getRoot();
-        ArrayList list = doc.processPath("*",null);
-        int k = 0;
-        while (k < list.size())
-        {
-          Object listNode = list.get(k++);
-          if (doc.getNodeName(listNode).equals("response"))
-          {
-            ArrayList list2 = doc.processPath("*",listNode);
-            int q = 0;
-            while (q < list2.size())
-            {
-              Object respNode = list2.get(q++);
-              if (doc.getNodeName(respNode).equals("str"))
-              {
-                String value = doc.getValue(respNode,"name");
-                if (value.equals("status"))
-                {
-                  statusValue = doc.getData(respNode).trim();
-                }
-              }
-            }
-          }
-        }
-        if (statusValue.equals("OK"))
-          return;
 
-        throw new ManifoldCFException("Status error: "+statusValue);
-      }
-      else
-        throw new ManifoldCFException("XML parsing error on response");
-    }
   }
-
+  
 }
 

Modified: manifoldcf/branches/CONNECTORS-594/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/SolrConfig.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-594/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/SolrConfig.java?rev=1427229&r1=1427228&r2=1427229&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-594/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/SolrConfig.java (original)
+++ manifoldcf/branches/CONNECTORS-594/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/SolrConfig.java Mon Dec 31 19:46:25 2012
@@ -27,12 +27,47 @@ public class SolrConfig
 
   // Configuration parameters
 
+  /** Solr type */
+  public static final String PARAM_SOLR_TYPE = "Solr type";
+  /** Type: Standard */
+  public static final String SOLR_TYPE_STANDARD = "standard";
+  /** Type: Solr Cloud */
+  public static final String SOLR_TYPE_SOLRCLOUD = "solrcloud";
+  
+  // SolrCloud zookeeper parameters
+  
+  // Zookeeper hosts, as nodes
+  /** Zookeeper node */
+  public static final String NODE_ZOOKEEPER = "zookeeper";
+  /** Zookeeper hostname */
+  public static final String ATTR_HOST = "host";
+  /** Zookeeper port */
+  public static final String ATTR_PORT = "port";
+  
+  /** ZooKeeper client timeout */
+  public static final String PARAM_ZOOKEEPER_CLIENT_TIMEOUT = "ZooKeeper client timeout";
+  /** ZooKeeper connect timeout */
+  public static final String PARAM_ZOOKEEPER_CONNECT_TIMEOUT = "ZooKeeper connect timeout";
+  /** Collection name */
+  public static final String PARAM_COLLECTION = "Collection";
+  
+  // General indexing parameters
+  
   /** Protocol */
   public static final String PARAM_PROTOCOL = "Server protocol";
+  /** Protocol: http */
+  public static final String PROTOCOL_TYPE_HTTP = "http";
+  /** Protocol: https */
+  public static final String PROTOCOL_TYPE_HTTPS = "https";
+  
   /** Server name */
   public static final String PARAM_SERVER = "Server name";
   /** Port */
   public static final String PARAM_PORT = "Server port";
+  /** Connection timeout */
+  public static final String PARAM_CONNECTION_TIMEOUT = "Connection timeout";
+  /** Socket timeout */
+  public static final String PARAM_SOCKET_TIMEOUT = "Socket timeout";
   /** Webapp */
   public static final String PARAM_WEBAPPNAME = "Server web application";
   /** Core */