You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2012/12/31 20:46:25 UTC
svn commit: r1427229 [1/2] - in
/manifoldcf/branches/CONNECTORS-594/connectors/solr: ./
connector/src/main/java/org/apache/manifoldcf/agents/output/solr/
connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/solr/
Author: kwright
Date: Mon Dec 31 19:46:25 2012
New Revision: 1427229
URL: http://svn.apache.org/viewvc?rev=1427229&view=rev
Log:
Port Solr connector to Solr-J
Modified:
manifoldcf/branches/CONNECTORS-594/connectors/solr/build.xml
manifoldcf/branches/CONNECTORS-594/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/HttpPoster.java
manifoldcf/branches/CONNECTORS-594/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/SolrConfig.java
manifoldcf/branches/CONNECTORS-594/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/SolrConnector.java
manifoldcf/branches/CONNECTORS-594/connectors/solr/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/solr/common_en_US.properties
manifoldcf/branches/CONNECTORS-594/connectors/solr/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/solr/common_ja_JP.properties
Modified: manifoldcf/branches/CONNECTORS-594/connectors/solr/build.xml
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-594/connectors/solr/build.xml?rev=1427229&r1=1427228&r2=1427229&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-594/connectors/solr/build.xml (original)
+++ manifoldcf/branches/CONNECTORS-594/connectors/solr/build.xml Mon Dec 31 19:46:25 2012
@@ -19,6 +19,30 @@
<import file="../connector-build.xml"/>
+ <path id="connector-classpath">
+ <path refid="mcf-connector-build.connector-classpath"/>
+ <fileset dir="../../lib">
+ <include name="solr-solrj*.jar"/>
+ <include name="httpmime*.jar"/>
+ <include name="zookeeper*.jar"/>
+ <include name="wstx-asl*.jar"/>
+ <include name="jcl-over-slf4j*.jar"/>
+ </fileset>
+ </path>
+
+ <target name="lib" depends="mcf-connector-build.lib,precompile-check" if="canBuild">
+ <mkdir dir="dist/lib"/>
+ <copy todir="dist/lib">
+ <fileset dir="../../lib">
+ <include name="solr-solrj*.jar"/>
+ <include name="httpmime*.jar"/>
+ <include name="zookeeper*.jar"/>
+ <include name="wstx-asl*.jar"/>
+ <include name="jcl-over-slf4j*.jar"/>
+ </fileset>
+ </copy>
+ </target>
+
<target name="integration">
<mkdir dir="dist/integration/solr-3.x"/>
<copy todir="dist/integration/solr-3.x">
Modified: manifoldcf/branches/CONNECTORS-594/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/HttpPoster.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-594/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/HttpPoster.java?rev=1427229&r1=1427228&r2=1427229&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-594/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/HttpPoster.java (original)
+++ manifoldcf/branches/CONNECTORS-594/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/HttpPoster.java Mon Dec 31 19:46:25 2012
@@ -32,6 +32,33 @@ import javax.net.ssl.*;
import org.apache.log4j.*;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.Credentials;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.impl.conn.PoolingClientConnectionManager;
+import org.apache.http.params.HttpConnectionParams;
+import org.apache.http.params.HttpParams;
+import org.apache.http.params.BasicHttpParams;
+import org.apache.http.params.CoreConnectionPNames;
+import org.apache.http.client.params.ClientPNames;
+import org.apache.http.client.params.HttpClientParams;
+import org.apache.http.conn.ClientConnectionManager;
+import org.apache.http.conn.scheme.Scheme;
+import org.apache.http.conn.ssl.SSLSocketFactory;
+import org.apache.http.conn.ssl.AllowAllHostnameVerifier;
+import org.apache.http.conn.params.ConnRoutePNames;
+
+import org.apache.solr.client.solrj.SolrServer;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrServer;
+import org.apache.solr.client.solrj.impl.HttpSolrServer;
+import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;
+import org.apache.solr.client.solrj.impl.XMLResponseParser;
+import org.apache.solr.common.util.ContentStreamBase;
+
/**
* Posts an input stream to SOLR
*
@@ -52,85 +79,81 @@ public class HttpPoster
public static String ingestPasswordProperty = "org.apache.manifoldcf.ingest.password";
public static String ingestMaxConnectionsProperty = "org.apache.manifoldcf.ingest.maxconnections";
- // Chunk size for base64-encoded headers
- protected final static int HEADER_CHUNK = 4096;
-
- private String protocol;
- private String host;
- private javax.net.ssl.SSLSocketFactory socketFactory;
- private int port;
- private String encodedCredentials;
- private String realm;
+ // Solrj connection-associated objects
+ protected ClientConnectionManager connectionManager = null;
+ protected SolrServer solrServer = null;
+
+ // Action URI pieces
private String postUpdateAction;
private String postRemoveAction;
private String postStatusAction;
+
+ // Attribute names
private String allowAttributeName;
private String denyAttributeName;
private String idAttributeName;
+ // Document max length
private Long maxDocumentLength;
+ // Commit-within flag
private String commitWithin;
-
+
+ // Constants we need
private static final String LITERAL = "literal.";
private static final String NOTHING = "__NOTHING__";
private static final String ID_METADATA = "lcf_metadata_id";
private static final String COMMITWITHIN_METADATA = "commitWithin";
- private int buffersize = 32768; // default buffer size
- double sizeCoefficient = 0.0005; // 20 ms additional timeout per 2000 bytes, pulled out of my butt
- /** the number of times we should poll for the response */
- int responseRetries = 9000; // Long basic wait: 3 minutes. This will also be added to by a term based on the size of the request.
- /** how long we should wait before checking for a new stream */
- long responseRetryWait = 20L;
/** How long to wait before retrying a failed ingestion */
- long interruptionRetryTime = 60000L;
-
- /** The multipart separator we're going to use. I was thinking of including a random number, but that would wreck repeatability */
- protected static byte[] separatorBytes = null;
- protected static byte[] endBytes = null;
- protected static byte[] postambleBytes = null;
- protected static byte[] preambleBytes = null;
- static
- {
- try
- {
- String separatorString = "------------------T-H-I-S--I-S--A--S-E-P-A-R-A-T-O-R--399123410141511";
- separatorBytes = (separatorString+"\r\n").getBytes("ASCII");
- endBytes = ("--"+separatorString+"--\r\n").getBytes("ASCII");
- postambleBytes = "\r\n".getBytes("ASCII");
- preambleBytes = "--".getBytes("ASCII");
- }
- catch (java.io.UnsupportedEncodingException e)
- {
- e.printStackTrace();
- System.exit(1);
- }
- }
+ private static final long interruptionRetryTime = 60000L;
- /** This is the secure socket factory we will use. I'm presuming it's thread-safe, but
- * if not, synchronization blocks are in order when it's used. */
- protected static javax.net.ssl.SSLSocketFactory openSecureSocketFactory = null;
- static
+ /** Initialize the SolrCloud http poster.
+ */
+ public HttpPoster(String zookeeperHosts, String collection,
+ int zkClientTimeout, int zkConnectTimeout,
+ String updatePath, String removePath, String statusPath,
+ String realm, String userID, String password,
+ String allowAttributeName, String denyAttributeName, String idAttributeName,
+ IKeystoreManager keystoreManager, Long maxDocumentLength,
+ String commitWithin)
+ throws ManifoldCFException
{
+ // These are the paths to the handlers in Solr that deal with the actions we need to do
+ this.postUpdateAction = updatePath;
+ this.postRemoveAction = removePath;
+ this.postStatusAction = statusPath;
+
+ this.commitWithin = commitWithin;
+
+ this.allowAttributeName = allowAttributeName;
+ this.denyAttributeName = denyAttributeName;
+ this.idAttributeName = idAttributeName;
+
+ this.maxDocumentLength = maxDocumentLength;
+
+ // What to do with basic auth and keystore manager parameters? Do they even make sense?
+ // MHL
+
try
{
- openSecureSocketFactory = getOpenSecureSocketFactory();
+ CloudSolrServer cloudSolrServer = new CloudSolrServer(zookeeperHosts);
+ cloudSolrServer.setZkClientTimeout(zkClientTimeout);
+ cloudSolrServer.setZkConnectTimeout(zkConnectTimeout);
+ cloudSolrServer.setDefaultCollection(collection);
+ // Set the solrj instance we want to use
+ solrServer = cloudSolrServer;
}
- catch (ManifoldCFException e)
+ catch (MalformedURLException e)
{
- // If we can't create, print and fail
- e.printStackTrace();
- System.exit(100);
+ throw new ManifoldCFException(e.getMessage(),e);
}
}
- /**
- * Initialized the http poster.
- * @param userID is the unencoded user name, or null.
- * @param password is the unencoded password, or null.
+ /** Initialize the standard http poster.
*/
- public HttpPoster(String protocol, String server, int port, String webappName,
+ public HttpPoster(String protocol, String server, int port, String webapp, String core,
+ int connectionTimeout, int socketTimeout,
String updatePath, String removePath, String statusPath,
String realm, String userID, String password,
String allowAttributeName, String denyAttributeName, String idAttributeName,
@@ -138,6 +161,11 @@ public class HttpPoster
String commitWithin)
throws ManifoldCFException
{
+ // These are the paths to the handlers in Solr that deal with the actions we need to do
+ this.postUpdateAction = updatePath;
+ this.postRemoveAction = removePath;
+ this.postStatusAction = statusPath;
+
this.commitWithin = commitWithin;
this.allowAttributeName = allowAttributeName;
@@ -145,54 +173,83 @@ public class HttpPoster
this.idAttributeName = idAttributeName;
this.maxDocumentLength = maxDocumentLength;
-
- this.host = server;
- this.port = port;
- this.protocol = protocol;
+
+ String location = "";
+ if (webapp != null)
+ location = "/" + webapp;
+ if (core != null)
+ {
+ if (webapp == null)
+ throw new ManifoldCFException("Webapp must be specified if core is specified.");
+ location += "/" + core;
+ }
+
+ // Initialize standard solr-j.
+ // First, we need an HttpClient where basic auth is properly set up.
+ PoolingClientConnectionManager localConnectionManager = new PoolingClientConnectionManager();
+ localConnectionManager.setMaxTotal(1);
+ SSLSocketFactory myFactory;
if (keystoreManager != null)
- this.socketFactory = keystoreManager.getSecureSocketFactory();
+ {
+ myFactory = new SSLSocketFactory(keystoreManager.getSecureSocketFactory(),
+ new AllowAllHostnameVerifier());
+ }
else
- // Use the "trust everything" one.
- this.socketFactory = openSecureSocketFactory;
-
+ {
+ // Use the "trust everything" one
+ myFactory = new SSLSocketFactory(KeystoreManagerFactory.getTrustingSecureSocketFactory(),
+ new AllowAllHostnameVerifier());
+ }
+ Scheme myHttpsProtocol = new Scheme("https", 443, myFactory);
+ localConnectionManager.getSchemeRegistry().register(myHttpsProtocol);
+ connectionManager = localConnectionManager;
+
+ BasicHttpParams params = new BasicHttpParams();
+ params.setBooleanParameter(CoreConnectionPNames.TCP_NODELAY,true);
+ params.setBooleanParameter(CoreConnectionPNames.STALE_CONNECTION_CHECK,false);
+ params.setBooleanParameter(ClientPNames.ALLOW_CIRCULAR_REDIRECTS,true);
+ params.setIntParameter(CoreConnectionPNames.SO_TIMEOUT,socketTimeout);
+ params.setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT,connectionTimeout);
+ params.setBooleanParameter(ClientPNames.HANDLE_REDIRECTS,true);
+ DefaultHttpClient localClient = new DefaultHttpClient(connectionManager,params);
+
if (userID != null && userID.length() > 0 && password != null)
{
- try
- {
- encodedCredentials = new org.apache.manifoldcf.core.common.Base64().encodeByteArray((userID+":"+password).getBytes("UTF-8"));
- }
- catch (java.io.UnsupportedEncodingException e)
- {
- throw new ManifoldCFException("Couldn't convert to utf-8 bytes: "+e.getMessage(),e);
- }
- this.realm = realm;
+ Credentials credentials = new UsernamePasswordCredentials(userID, password);
+ if (realm != null)
+ localClient.getCredentialsProvider().setCredentials(new AuthScope(AuthScope.ANY_HOST, AuthScope.ANY_PORT, realm), credentials);
+ else
+ localClient.getCredentialsProvider().setCredentials(AuthScope.ANY, credentials);
}
- else
- encodedCredentials = null;
-
- String postURI = protocol + "://" + server + ":" + Integer.toString(port);
- if (webappName.length() > 0)
- webappName = "/" + webappName;
-
- postUpdateAction = webappName + updatePath;
- postRemoveAction = webappName + removePath;
- postStatusAction = webappName + statusPath;
-
- String x = ManifoldCF.getProperty(ingestBufferSizeProperty);
- if (x != null && x.length() > 0)
- buffersize = new Integer(x).intValue();
- x = ManifoldCF.getProperty(ingestResponseRetryCount);
- if (x != null && x.length() > 0)
- responseRetries = new Integer(x).intValue();
- x = ManifoldCF.getProperty(ingestResponseRetryInterval);
- if (x != null && x.length() > 0)
- responseRetryWait = new Long(x).longValue();
- x = ManifoldCF.getProperty(ingestRescheduleInterval);
- if (x != null && x.length() > 0)
- interruptionRetryTime = new Long(x).longValue();
+ String httpSolrServerUrl = protocol + "://" + server + ":" + port + location;
+ HttpSolrServer httpSolrServer = new HttpSolrServer(httpSolrServerUrl, localClient);
+ // For portability with older versions of Solr
+ httpSolrServer.setParser(new XMLResponseParser());
+ // Set the solrj instance we want to use
+ solrServer = httpSolrServer;
}
+ /** Poll the poster.
+ */
+ public void poll()
+ {
+ if (connectionManager != null)
+ connectionManager.closeIdleConnections(60000L,TimeUnit.MILLISECONDS);
+ }
+
+ /** Shut down the poster.
+ */
+ public void shutdown()
+ {
+ if (solrServer != null)
+ solrServer.shutdown();
+ solrServer = null;
+ if (connectionManager != null)
+ connectionManager.shutdown();
+ connectionManager = null;
+ }
+
/** Cause a commit to happen.
*/
public void commitPost()
@@ -216,10 +273,8 @@ public class HttpPoster
Throwable thr = t.getException();
if (thr != null)
{
- if (thr instanceof ServiceInterruption)
- throw (ServiceInterruption)thr;
- if (thr instanceof ManifoldCFException)
- throw (ManifoldCFException)thr;
+ if (thr instanceof SolrServerException)
+ throw (SolrServerException)thr;
if (thr instanceof IOException)
throw (IOException)thr;
if (thr instanceof RuntimeException)
@@ -235,6 +290,10 @@ public class HttpPoster
throw new ManifoldCFException("Interrupted: "+e.getMessage(),ManifoldCFException.INTERRUPTED);
}
}
+ catch (SolrServerException e)
+ {
+ throw new ManifoldCFException("Solr exception during commit: "+e.getMessage(),e);
+ }
catch (IOException ioe)
{
if (ioErrorRetry == 0)
@@ -248,7 +307,7 @@ public class HttpPoster
true);
}
}
-
+
// Go back around again!
// Sleep for a time, and retry
try
@@ -265,6 +324,7 @@ public class HttpPoster
}
+
/**
* Post the input stream to ingest
* @param documentURI is the document's uri.
@@ -321,10 +381,8 @@ public class HttpPoster
Throwable thr = t.getException();
if (thr != null)
{
- if (thr instanceof ServiceInterruption)
- throw (ServiceInterruption)thr;
- if (thr instanceof ManifoldCFException)
- throw (ManifoldCFException)thr;
+ if (thr instanceof SolrServerException)
+ throw (SolrServerException)thr;
if (thr instanceof IOException)
throw (IOException)thr;
if (thr instanceof RuntimeException)
@@ -340,13 +398,17 @@ public class HttpPoster
throw new ManifoldCFException("Interrupted: "+e.getMessage(),ManifoldCFException.INTERRUPTED);
}
}
+ catch (SolrServerException e)
+ {
+ throw new ManifoldCFException("Solr exception during indexing: "+e.getMessage(),e);
+ }
catch (java.net.SocketTimeoutException ioe)
{
if (readFromDocumentStreamYet || ioErrorRetry == 0)
{
// If this continues, we should indeed abort the job. Retries should not go on indefinitely either; 2 hours is plenty
long currentTime = System.currentTimeMillis();
- throw new ServiceInterruption("IO error connecting to ingestion API: "+ioe.getMessage()+"; ingestion will be retried again later",
+ throw new ServiceInterruption("IO error indexing: "+ioe.getMessage()+"; ingestion will be retried again later",
ioe,
currentTime + interruptionRetryTime,
currentTime + 2L * 60L * 60000L,
@@ -409,10 +471,8 @@ public class HttpPoster
Throwable thr = t.getException();
if (thr != null)
{
- if (thr instanceof ServiceInterruption)
- throw (ServiceInterruption)thr;
- if (thr instanceof ManifoldCFException)
- throw (ManifoldCFException)thr;
+ if (thr instanceof SolrServerException)
+ throw (SolrServerException)thr;
if (thr instanceof IOException)
throw (IOException)thr;
if (thr instanceof RuntimeException)
@@ -428,6 +488,10 @@ public class HttpPoster
throw new ManifoldCFException("Interrupted: "+e.getMessage(),ManifoldCFException.INTERRUPTED);
}
}
+ catch (SolrServerException e)
+ {
+ throw new ManifoldCFException("Solr exception during check: "+e.getMessage(),e);
+ }
catch (IOException ioe)
{
if (ioErrorRetry == 0)
@@ -485,10 +549,8 @@ public class HttpPoster
Throwable thr = t.getException();
if (thr != null)
{
- if (thr instanceof ServiceInterruption)
- throw (ServiceInterruption)thr;
- if (thr instanceof ManifoldCFException)
- throw (ManifoldCFException)thr;
+ if (thr instanceof SolrServerException)
+ throw (SolrServerException)thr;
if (thr instanceof IOException)
throw (IOException)thr;
if (thr instanceof RuntimeException)
@@ -504,6 +566,10 @@ public class HttpPoster
throw new ManifoldCFException("Interrupted: "+e.getMessage(),ManifoldCFException.INTERRUPTED);
}
}
+ catch (SolrServerException e)
+ {
+ throw new ManifoldCFException("Solr exception during delete: "+e.getMessage(),e);
+ }
catch (IOException ioe)
{
if (ioErrorRetry == 0)
@@ -559,461 +625,27 @@ public class HttpPoster
return new String[0];
}
- /**
- * Read an ascii line from an input stream
- */
- protected static String readLine(InputStream in)
- throws IOException
- {
- ByteBuffer bb = new ByteBuffer();
- while (true)
- {
- int x = in.read();
- if (x == -1)
- throw new IOException("Unexpected EOF");
- if (x == 13)
- continue;
- if (x == 10)
- break;
-
- bb.append((byte)x);
- }
- return bb.toString("ASCII");
- }
-
- /**
- * Get the response code of the post
- * @param in the stream the response is going to come from
- * @return the response details.
- * @throws ManifoldCFException
- */
- protected CodeDetails getResponse(InputStream in) throws ManifoldCFException, ServiceInterruption
- {
- Logging.ingest.debug("Waiting for response stream");
-
- try
- {
- // Stream.ready() always returns false for secure sockets :-(. So
- // we have to rely on socket timeouts to interrupt us if the server goes down.
- String responseCode = readLine(in);
-
- if (Logging.ingest.isDebugEnabled())
- Logging.ingest.debug("Response code from ingest: '" + responseCode + "'");
-
- // Read the response headers
- String contentType = "text/plain; charset=iso-8859-1";
- while (true)
- {
- String headerLine = readLine(in);
- if (headerLine.length() == 0)
- break;
- // Look for the headers we care about, ignore the rest...
- int spaceIndex = headerLine.indexOf(" ");
- if (spaceIndex != -1)
- {
- String headerName = headerLine.substring(0,spaceIndex);
- String headerValue = headerLine.substring(spaceIndex).trim().toLowerCase();
- if (headerName.toLowerCase().equals("content-type:"))
- {
- contentType = headerValue;
- }
- }
- }
-
- // Now read the response data. It's safe to assemble the data in memory.
- int charsetIndex = contentType.indexOf("charset=");
- String charsetName = "iso-8859-1";
- if (charsetIndex != -1)
- charsetName = contentType.substring(charsetIndex+8);
-
- // NOTE: We may get back an unparseable pile of goo here, especially if the error is a 500 error.
- // But we can't hand the binary to the XML parser and still be able to get at the raw data. So we
- // read the data into memory first (as binary), and then make a decision based on parseability as to whether
- // we attempt to decode it.
- byte[] responseContent = readInputStream(in);
-
- XMLDoc doc = null;
- String rawString = null;
- try
- {
- doc = new XMLDoc(new ByteArrayInputStream(responseContent));
- }
- catch (ManifoldCFException e)
- {
- // Syntax errors should be eaten; we'll just return a null doc in that case.
- // But we do try to convert the raw data to string form.
- try
- {
- rawString = new String(responseContent,charsetName);
- }
- catch (UnsupportedEncodingException e2)
- {
- // Uh oh, can't even convert it to a string. Now we are desperate.
- rawString = "Response had an illegal encoding: "+e2.getMessage();
- e.printStackTrace();
- }
- }
-
- Logging.ingest.debug("Read of response stream complete");
- return new CodeDetails(responseCode,doc,rawString);
- }
- catch (java.net.SocketTimeoutException e)
- {
- // If this continues, we should indeed abort the job. Retries should not go on indefinitely either; 2 hours is plenty
- long currentTime = System.currentTimeMillis();
- throw new ServiceInterruption("Ingestion API socket timeout exception waiting for response code: "+e.getMessage()+"; ingestion will be retried again later",
- e,
- currentTime + interruptionRetryTime,
- currentTime + 2L * 60L * 60000L,
- -1,
- true);
- }
- catch (InterruptedIOException e)
- {
- throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
- }
- catch (java.net.ConnectException e)
- {
- // If this continues, we should indeed abort the job. Retries should not go on indefinitely either; 2 hours is plenty
- long currentTime = System.currentTimeMillis();
- throw new ServiceInterruption("Timed out connecting to ingestion API: "+e.getMessage()+"; ingestion will be retried again later",
- e,
- currentTime + interruptionRetryTime,
- currentTime + 2L * 60L * 60000L,
- -1,
- true);
- }
- catch (java.net.SocketException e)
- {
- // Return 400 error; likely a connection reset which lost us the response data, so
- // just treat it as something OK.
- return new CodeDetails("HTTP/1.0 400 Connection Reset",null,null);
-
- }
- catch (IOException ioe)
- {
- Logging.ingest.warn("IO exception trying to get response from ingestion API: "+ioe.getMessage(),ioe);
- // If this continues, we should indeed abort the job. Retries should not go on indefinitely either; 2 hours is plenty
- long currentTime = System.currentTimeMillis();
- throw new ServiceInterruption("IO exception waiting for response code: "+ioe.getMessage()+"; ingestion will be retried again later",
- ioe,
- currentTime + interruptionRetryTime,
- currentTime + 2L * 60L * 60000L,
- -1,
- true);
- }
- }
-
- /** Read input stream into an in-memory array */
- protected static byte[] readInputStream(InputStream is)
- throws IOException
- {
- // Create an array of byte arrays, and assemble the result into a final piece at the end.
- List array = new ArrayList();
- int count = 0;
- while (true)
- {
- byte[] buffer = new byte[65536];
- int amt = is.read(buffer);
- if (amt == -1)
- break;
- count += amt;
- array.add(buffer);
- }
- byte[] rval = new byte[count];
- int pointer = 0;
- int index = 0;
- while (pointer < count)
- {
- byte[] buffer = (byte[])array.get(index++);
- if (buffer.length > count-pointer)
- {
- System.arraycopy(buffer,0,rval,pointer,count-pointer);
- pointer = count;
- }
- else
- {
- System.arraycopy(buffer,0,rval,pointer,buffer.length);
- pointer += buffer.length;
- }
- }
- return rval;
- }
-
- /** Write credentials to output */
- protected void writeCredentials(OutputStream out)
- throws IOException
- {
- // Apply credentials if present
- if (encodedCredentials != null)
- {
- Logging.ingest.debug("Applying credentials");
- byte[] tmp = ("Authorization: Basic " + encodedCredentials + "\r\n").getBytes("UTF-8");
- out.write(tmp, 0, tmp.length);
-
- tmp = ("WWW-Authenticate: Basic realm=\"" + ((realm != null) ? realm : "") + "\"\r\n").getBytes("UTF-8");
- out.write(tmp, 0, tmp.length);
- }
- }
-
- /** Build a secure socket factory based on no keystore and a lax trust manager.
- * This allows use of SSL for privacy but not identification. */
- protected static javax.net.ssl.SSLSocketFactory getOpenSecureSocketFactory()
- throws ManifoldCFException
- {
- try
- {
- java.security.SecureRandom secureRandom = java.security.SecureRandom.getInstance("SHA1PRNG");
-
- // Create an SSL context
- javax.net.ssl.SSLContext sslContext = javax.net.ssl.SSLContext.getInstance("SSL");
- sslContext.init(null,new LaxTrustManager[]{new LaxTrustManager()},secureRandom);
- return sslContext.getSocketFactory();
- }
- catch (java.security.NoSuchAlgorithmException e)
- {
- throw new ManifoldCFException("No such algorithm: "+e.getMessage(),e);
- }
- catch (java.security.KeyManagementException e)
- {
- throw new ManifoldCFException("Key management exception: "+e.getMessage(),e);
- }
- }
-
- /** Create a socket in a manner consistent with all of our specified parameters.
- */
- protected Socket createSocket(long responseRetryCount)
- throws IOException, ManifoldCFException
- {
- Socket socket;
- if (protocol.equals("https") && socketFactory != null)
- {
- try
- {
- //SocketFactory factory = SSLSocketFactory.getDefault();
- socket = socketFactory.createSocket(host,port);
- }
- catch (InterruptedIOException e)
- {
- throw e;
- }
- catch (IOException e)
- {
- throw new ManifoldCFException("Couldn't set up SSL connection to ingestion API: "+e.getMessage(),e);
- }
- }
- else
- socket = new Socket(host, port);
-
- // Calculate the timeout we want
- long timeoutMilliseconds = responseRetryWait * responseRetryCount;
- socket.setSoTimeout((int)timeoutMilliseconds);
-
- return socket;
- }
-
- /** Byte buffer class */
- protected static class ByteBuffer
- {
- byte[] theBuffer;
- int bufferAmt;
-
- public ByteBuffer()
- {
- createBuffer(64);
- }
-
- protected void createBuffer(int size)
- {
- theBuffer = new byte[size];
- }
-
- public void append(byte b)
- {
- if (bufferAmt == theBuffer.length)
- {
- byte[] oldBuffer = theBuffer;
- createBuffer(bufferAmt * 2);
- int i = 0;
- while (i < bufferAmt)
- {
- theBuffer[i] = oldBuffer[i];
- i++;
- }
- }
- theBuffer[bufferAmt++] = b;
- }
-
- public String toString(String encoding)
- throws java.io.UnsupportedEncodingException
- {
- return new String(theBuffer,0,bufferAmt,encoding);
- }
-
- }
-
- /** Our own trust manager, which ignores certificate issues */
- protected static class LaxTrustManager implements X509TrustManager
- {
- /** Does nothing */
- public LaxTrustManager()
- {
- }
-
- /** Return a list of accepted issuers. There are none. */
- public java.security.cert.X509Certificate[] getAcceptedIssuers()
- {
- return new java.security.cert.X509Certificate[0];
- }
-
- /** We have no problem with any clients */
- public void checkClientTrusted(java.security.cert.X509Certificate[] certs, String authType)
- throws java.security.cert.CertificateException
- {
- }
-
- /** We have no problem with any servers */
- public void checkServerTrusted(java.security.cert.X509Certificate[] certs, String authType)
- throws java.security.cert.CertificateException
- {
- }
-
- }
-
- /** Calculate the length of the preamble */
- protected static int lengthPreamble()
- throws IOException
- {
- return preambleBytes.length;
- }
-
- /** Calculate the length of a boundary */
- protected static int lengthBoundary(String contentType, String name, String fileName)
- throws IOException
- {
- int rval = 0;
- rval += separatorBytes.length;
- String value = "Content-Disposition: form-data";
- if (name != null)
- value += "; name=\""+name+"\"";
- if (fileName != null)
- value += "; filename=\""+fileName+"\"";
- value += "\r\n";
- byte[] tmp = value.getBytes("UTF-8");
- rval += tmp.length;
- tmp = ("Content-Type: "+contentType+"\r\n\r\n").getBytes("ASCII");
- rval += tmp.length;
- return rval;
- }
-
- /** Calculate the length of the postamble */
- protected static int lengthPostamble()
- throws IOException
- {
- return postambleBytes.length;
- }
-
- /** Calculate the length of a field */
- protected static int lengthField(String fieldName, String fieldValue)
- throws IOException
- {
- int rval = lengthPreamble() + lengthBoundary("text/plain; charset=UTF-8",fieldName,null);
- byte[] tmp = fieldValue.getBytes("UTF-8");
- rval += tmp.length;
- rval += lengthPostamble();
- return rval;
- }
-
- /** Count the size of an acl level */
- protected int lengthACLs(String aclType, String[] acl, String[] denyAcl)
- throws IOException
- {
- int totalLength = 0;
- String metadataACLName = LITERAL + allowAttributeName + aclType;
- int i = 0;
- while (i < acl.length)
- {
- totalLength += lengthField(metadataACLName,acl[i++]);
- }
- String metadataDenyACLName = LITERAL + denyAttributeName + aclType;
- i = 0;
- while (i < denyAcl.length)
- {
- totalLength += lengthField(metadataDenyACLName,denyAcl[i++]);
- }
- return totalLength;
- }
-
- /** Write the preamble */
- protected static void writePreamble(OutputStream out)
- throws IOException
- {
- out.write(preambleBytes, 0, preambleBytes.length);
- }
-
- /** Write a boundary */
- protected static void writeBoundary(OutputStream out, String contentType, String name, String fileName)
- throws IOException
- {
- out.write(separatorBytes, 0, separatorBytes.length);
- String value = "Content-Disposition: form-data";
- if (name != null)
- value += "; name=\""+name+"\"";
- if (fileName != null)
- value += "; filename=\""+fileName+"\"";
- value += "\r\n";
- byte[] tmp = value.getBytes("UTF-8");
- out.write(tmp, 0, tmp.length);
- tmp = ("Content-Type: "+contentType+"\r\n\r\n").getBytes("ASCII");
- out.write(tmp, 0, tmp.length);
- }
-
- /** Write the postamble */
- protected static void writePostamble(OutputStream out)
- throws IOException
- {
- out.write(postambleBytes, 0, postambleBytes.length);
- }
-
/** Write a field */
- protected static void writeField(OutputStream out, String fieldName, String fieldValue)
- throws IOException
+ protected static void writeField(ContentStreamUpdateRequest out, String fieldName, String fieldValue)
{
- writePreamble(out);
- writeBoundary(out,"text/plain; charset=UTF-8",fieldName,null);
- byte[] tmp = fieldValue.getBytes("UTF-8");
- out.write(tmp, 0, tmp.length);
- writePostamble(out);
+ out.setParam(fieldName,fieldValue);
}
-
/** Output an acl level */
- protected void writeACLs(OutputStream out, String aclType, String[] acl, String[] denyAcl)
- throws IOException
+ protected void writeACLs(ContentStreamUpdateRequest out, String aclType, String[] acl, String[] denyAcl)
{
String metadataACLName = LITERAL + allowAttributeName + aclType;
- int i = 0;
- while (i < acl.length)
+ for (int i = 0; i < acl.length; i++)
{
- writeField(out,metadataACLName,acl[i++]);
+ writeField(out,metadataACLName,acl[i]);
}
String metadataDenyACLName = LITERAL + denyAttributeName + aclType;
- i = 0;
- while (i < denyAcl.length)
+ for (int i = 0; i < denyAcl.length; i++)
{
- writeField(out,metadataDenyACLName,denyAcl[i++]);
+ writeField(out,metadataDenyACLName,denyAcl[i]);
}
}
- /** XML encoding */
- protected static String xmlEncode(String input)
- {
- StringBuilder sb = new StringBuilder("<![CDATA[");
- sb.append(input);
- sb.append("]]>");
- return sb.toString();
- }
-
/** Killable thread that does ingestions.
* Java 1.5 stopped permitting thread interruptions to abort socket waits. As a result, it is impossible to get threads to shutdown cleanly that are doing
* such waits. So, the places where this happens are segregated in their own threads so that they can be just abandoned.
@@ -1024,8 +656,8 @@ public class HttpPoster
{
protected String documentURI;
protected RepositoryDocument document;
- protected Map arguments;
- protected Map sourceTargets;
+ protected Map<String,List<String>> arguments;
+ protected Map<String,String> sourceTargets;
protected String[] shareAcls;
protected String[] shareDenyAcls;
protected String[] acls;
@@ -1040,7 +672,8 @@ public class HttpPoster
protected boolean readFromDocumentStreamYet = false;
protected boolean rval = false;
- public IngestThread(String documentURI, RepositoryDocument document, Map arguments, Map sourceTargets,
+ public IngestThread(String documentURI, RepositoryDocument document,
+ Map<String,List<String>> arguments, Map<String,String> sourceTargets,
String[] shareAcls, String[] shareDenyAcls, String[] acls, String[] denyAcls, String commitWithin)
{
super();
@@ -1069,366 +702,99 @@ public class HttpPoster
// Open a socket to ingest, and to the response stream to get the post result
try
{
- // Set up the socket, and the (optional) secure socket.
- long responseRetryCount = responseRetries + (long)((float)length * sizeCoefficient);
- Socket socket = createSocket(responseRetryCount);
- try
+ ContentStreamUpdateRequest contentStreamUpdateRequest = new ContentStreamUpdateRequest(postUpdateAction);
+
+ // Write the id field
+ writeField(contentStreamUpdateRequest,LITERAL+idAttributeName,documentURI);
+
+ // Write the access token information
+ writeACLs(contentStreamUpdateRequest,"share",shareAcls,shareDenyAcls);
+ writeACLs(contentStreamUpdateRequest,"document",acls,denyAcls);
+
+ // Write the arguments
+ for (String name : arguments.keySet())
{
- InputStream in = socket.getInputStream();
- try
+ List<String> values = arguments.get(name);
+ for (String value : values)
{
- OutputStream out = socket.getOutputStream();
- try
- {
- // Create the output stream to SOLR
- byte[] tmp = ("POST " + postUpdateAction + " HTTP/1.0\r\n").getBytes("ASCII");
- out.write(tmp, 0, tmp.length);
-
- // Set all the headers
- writeCredentials(out);
-
- // Headers must include the following:
- // Content-Type
- // Content-Length
- // The content-length is calculated using the entire body length, which therefore includes the length of all the metadata fields as well.
-
- // Come up with a boundary. Ideally, the boundary should be something that doesn't exist in any of the data. In practice, that would mean
- // scanning all such data at least twice: once to make sure we avoided all boundary collisions, and a second time to actually output the data.
- // This is such a huge chunk of overhead, I've decided for now to just punt and pick something that's pretty unlikely.
-
-
- // Calculate the content length. To do this, we have to walk through the entire multipart assembly process, but calculate the length rather than output
- // anything.
-
- int totalLength = 0;
- // Count the id.
- totalLength += lengthField(LITERAL+idAttributeName,documentURI);
- // Count the acls
- totalLength += lengthACLs("share",shareAcls,shareDenyAcls);
- totalLength += lengthACLs("document",acls,denyAcls);
- // Count the arguments
- Iterator iter = arguments.keySet().iterator();
- while (iter.hasNext())
- {
- String name = (String)iter.next();
- List values = (List)arguments.get(name);
- int j = 0;
- while (j < values.size())
- {
- String value = (String)values.get(j++);
- totalLength += lengthField(name,value);
- }
- }
- // Count the metadata.
- iter = document.getFields();
- while (iter.hasNext())
- {
- String fieldName = (String)iter.next();
- String newFieldName = (String)sourceTargets.get(fieldName);
- if (newFieldName == null)
- newFieldName = fieldName;
- // Make SURE we can't double up on the id field inadvertantly!
- if (newFieldName.length() > 0)
- {
- if (newFieldName.toLowerCase().equals(idAttributeName.toLowerCase()))
- newFieldName = ID_METADATA;
- String[] values = document.getFieldAsStrings(fieldName);
- // We only handle strings right now!!!
- int k = 0;
- while (k < values.length)
- {
- String value = values[k++];
- totalLength += lengthField(LITERAL+newFieldName,value);
- }
- }
- }
- // Count the commitWithin parameter
- if (commitWithin != null)
- totalLength += lengthField(COMMITWITHIN_METADATA,commitWithin);
- // Count the binary data
- totalLength += lengthPreamble();
- totalLength += lengthBoundary("application/octet-stream","myfile",document.getFileName());
- totalLength += length;
- // Count the postamble
- totalLength += lengthPostamble();
- // Count the end marker.
- totalLength += endBytes.length;
-
- // Now, output the content-length header, and another newline, to start the data.
- tmp = ("Content-Length: "+Integer.toString(totalLength)+"\r\n").getBytes("ASCII");
- out.write(tmp, 0, tmp.length);
-
- tmp = ("Content-Type: multipart/form-data; boundary=").getBytes("ASCII");
- out.write(tmp, 0, tmp.length);
- out.write(separatorBytes, 0, separatorBytes.length);
-
- // End of headers.
- tmp = "\r\n".getBytes("ASCII");
- out.write(tmp, 0, tmp.length);
-
- // Write the id field
- writeField(out,LITERAL+idAttributeName,documentURI);
-
- // Write the access token information
- writeACLs(out,"share",shareAcls,shareDenyAcls);
- writeACLs(out,"document",acls,denyAcls);
-
- // Write the arguments
- iter = arguments.keySet().iterator();
- while (iter.hasNext())
- {
- String name = (String)iter.next();
- List values = (List)arguments.get(name);
- int j = 0;
- while (j < values.size())
- {
- String value = (String)values.get(j++);
- writeField(out,name,value);
- }
- }
-
- // Write the metadata, each in a field by itself
- iter = document.getFields();
- while (iter.hasNext())
- {
- String fieldName = (String)iter.next();
- String newFieldName = (String)sourceTargets.get(fieldName);
- if (newFieldName == null)
- newFieldName = fieldName;
- if (newFieldName.length() > 0)
- {
- if (newFieldName.toLowerCase().equals(idAttributeName.toLowerCase()))
- newFieldName = ID_METADATA;
- String[] values = document.getFieldAsStrings(fieldName);
- // We only handle strings right now!!!
- int k = 0;
- while (k < values.length)
- {
- String value = values[k++];
- writeField(out,LITERAL+newFieldName,value);
- }
- }
- }
-
- // Write the commitWithin parameter
- if (commitWithin != null)
- writeField(out,COMMITWITHIN_METADATA,commitWithin);
-
- // Write the content
- writePreamble(out);
-
- writeBoundary(out,"application/octet-stream","myfile",document.getFileName());
-
- // Stream the data
- long total = 0;
- long now, later;
- now = System.currentTimeMillis();
-
- byte[] bytes = new byte[buffersize];
-
- // Write out the contents of the inputstream to the socket
- while (true)
- {
- int count;
- // Specially catch all errors that come from reading the input stream itself.
- // This will help us segregate errors that come from the stream vs. those that come from the ingestion system.
- try
- {
- count = is.read(bytes);
- }
- catch (java.net.SocketTimeoutException ioe)
- {
- // We have to catch socket timeout exceptions specially, because they are derived from InterruptedIOException
- // They are otherwise just like IOExceptions
-
- // Log the error
- Logging.ingest.warn("Error reading data for transmission to Ingestion API: "+ioe.getMessage(),ioe);
-
- activityStart = new Long(fullStartTime);
- activityCode = "-1";
- activityDetails = "Couldn't read document: "+ioe.getMessage();
-
- // If this continues, we should indeed abort the job. Retries should not go on indefinitely either; 2 hours is plenty
- long currentTime = System.currentTimeMillis();
- throw new ServiceInterruption("IO error reading document for ingestion: "+ioe.getMessage()+"; read will be retried again later",
- ioe,
- currentTime + interruptionRetryTime,
- currentTime + 2L * 60L * 60000L,
- -1,
- true);
-
- }
- catch (InterruptedIOException ioe)
- {
- // If the transfer was interrupted, it may be because we are shutting down the thread.
-
- // Third-party library exceptions derived from InterruptedIOException are possible; if the stream comes from httpclient especially.
- // If we see one of these, we treat it as "not an interruption".
- if (!ioe.getClass().getName().equals("java.io.InterruptedIOException"))
- {
- // Log the error
- Logging.ingest.warn("Error reading data for transmission to Ingestion API: "+ioe.getMessage(),ioe);
-
- activityStart = new Long(fullStartTime);
- activityCode = "-1";
- activityDetails = "Couldn't read document: "+ioe.getMessage();
-
- // If this continues, we should indeed abort the job. Retries should not go on indefinitely either; 2 hours is plenty
- long currentTime = System.currentTimeMillis();
- throw new ServiceInterruption("IO error reading document for ingestion: "+ioe.getMessage()+"; read will be retried again later",
- ioe,
- currentTime + interruptionRetryTime,
- currentTime + 2L * 60L * 60000L,
- -1,
- true);
- }
- else
- throw ioe;
- }
- catch (IOException ioe)
- {
- // We need to decide whether to throw a service interruption or lcf exception, based on what went wrong.
- // We never retry here; the cause is the repository, so there's not any point.
-
- // Log the error
- Logging.ingest.warn("Error reading data for transmission to Ingestion API: "+ioe.getMessage(),ioe);
-
- activityStart = new Long(fullStartTime);
- activityCode = "-1";
- activityDetails = "Couldn't read document: "+ioe.getMessage();
-
- // If this continues, we should indeed abort the job. Retries should not go on indefinitely either; 2 hours is plenty
- long currentTime = System.currentTimeMillis();
- throw new ServiceInterruption("IO error reading document for ingestion: "+ioe.getMessage()+"; read will be retried again later",
- ioe,
- currentTime + interruptionRetryTime,
- currentTime + 2L * 60L * 60000L,
- -1,
- true);
- }
-
- if (count == -1)
- break;
- readFromDocumentStreamYet = true;
- out.write(bytes,0,count);
- total += (long)count;
- }
-
- // Write the postamble
- writePostamble(out);
-
- // Write the end marker
- out.write(endBytes, 0, endBytes.length);
-
- out.flush();
-
- later = System.currentTimeMillis();
- if (Logging.ingest.isDebugEnabled())
- Logging.ingest.debug("Total bytes posted: " + new Long(total).toString() + ", total time: " + (later - now));
-
- // Now, process response
- CodeDetails cd;
- try
- {
- cd = getResponse(in);
- }
- catch (ServiceInterruption si)
- {
- activityStart = new Long(now);
- activityCode = "-2";
- activityDetails = si.getMessage();
- throw si;
- }
-
-
- activityStart = new Long(now);
- activityBytes = new Long(length);
- activityCode = cd.getCode();
- activityDetails = cd.getDetails();
-
- int codeValue = cd.getCodeValue();
-
- // A negative number means http error of some kind.
- if (codeValue < 0)
- throw new ManifoldCFException("Http protocol error");
-
- // 200 means we got a status document back
- if (codeValue == 200)
- {
- // Look at response XML
- cd.parseIngestionResponse();
- rval = true;
- return;
- }
-
- // Anything else means the document didn't ingest.
- // There are three possibilities here:
- // 1) The document will NEVER ingest (it's illegal), in which case a 400 or 403 will be returned, and
- // 2) There is a transient error, in which case we will want to try again, after a wait.
- // If the situation is (2), then we CAN'T retry if we already read any of the stream; therefore
- // we are forced to throw a "service interrupted" exception, and let the caller reschedule
- // the ingestion.
- // 3) Something is wrong with the setup, e.g. bad credentials. In this case we chuck a ManifoldCFException,
- // since this will abort the current activity entirely.
-
- if (codeValue == 401)
- throw new ManifoldCFException("Bad credentials for ingestion",ManifoldCFException.SETUP_ERROR);
-
- if ((codeValue >= 400 && codeValue < 500) ||
- (codeValue == 500 && cd.getDetails() != null && cd.getDetails().indexOf("org.apache.tika.exception.TikaException") != -1))
- {
- rval = false;
- return;
- }
-
- // If this continues, we should indeed abort the job. Retries should not go on indefinitely either; 2 hours is plenty
- long currentTime = System.currentTimeMillis();
- throw new ServiceInterruption("Error "+Integer.toString(codeValue)+" from ingestion request; ingestion will be retried again later",
- new ManifoldCFException("Ingestion HTTP error code "+Integer.toString(codeValue)),
- currentTime + interruptionRetryTime,
- currentTime + 2L * 60L * 60000L,
- -1,
- true);
- }
- finally
+ writeField(contentStreamUpdateRequest,name,value);
+ }
+ }
+
+ // Write the metadata, each in a field by itself
+ Iterator<String> iter = document.getFields();
+ while (iter.hasNext())
+ {
+ String fieldName = iter.next();
+ String newFieldName = sourceTargets.get(fieldName);
+ if (newFieldName == null)
+ newFieldName = fieldName;
+ if (newFieldName.length() > 0)
+ {
+ if (newFieldName.toLowerCase().equals(idAttributeName.toLowerCase()))
+ newFieldName = ID_METADATA;
+ String[] values = document.getFieldAsStrings(fieldName);
+ // We only handle strings right now!!!
+ for (String value : values)
{
- out.close();
+ writeField(contentStreamUpdateRequest,LITERAL+newFieldName,value);
}
}
- finally
- {
- in.close();
- }
}
- finally
+
+ // Write the commitWithin parameter
+ if (commitWithin != null)
+ writeField(contentStreamUpdateRequest,COMMITWITHIN_METADATA,commitWithin);
+
+ contentStreamUpdateRequest.addContentStream(new RepositoryDocumentStream(is,length));
+
+ // Fire off the request.
+ // Note: I need to know whether the document has been permanently rejected or not, but we currently have
+ // no means to determine that. Analysis of SolrServerExceptions that have been thrown is likely needed.
+ try
{
- try
- {
- socket.close();
- }
- catch (InterruptedIOException e)
- {
- throw e;
- }
- catch (IOException e)
- {
- Logging.ingest.debug("Error closing socket: "+e.getMessage(),e);
- // Do NOT rethrow
- }
+ readFromDocumentStreamYet = true;
+ solrServer.request(contentStreamUpdateRequest);
+
+ // Successful completion
+ activityStart = new Long(fullStartTime);
+ activityBytes = new Long(length);
+ activityCode = "OK";
+ activityDetails = null;
+
+ rval = true;
+ return;
+ }
+ catch (SolrServerException e)
+ {
+ // Log what happened to us
+ activityStart = new Long(fullStartTime);
+ activityBytes = new Long(length);
+ activityCode = "FAILED";
+ activityDetails = e.getMessage();
+
+ // Use the exception text to determine the proper result.
+ if (e.getMessage().indexOf("org.apache.tika.exception.TikaException") != -1)
+ {
+ // Can't process the document, so don't keep trying.
+ rval = false;
+ return;
+ }
+
+ // Otherwise, abort the job. This is not the right thing to do probably, but we
+ // need to know SolrJ's exception cases before we can get this logic right.
+ // MHL
+ throw e;
}
- }
- catch (UnsupportedEncodingException ioe)
- {
- throw new ManifoldCFException("Fatal ingestion error: "+ioe.getMessage(),ioe);
}
catch (java.net.SocketTimeoutException ioe)
{
// These are just like IO errors, but since they are derived from InterruptedIOException, they have to be caught first.
// Log the error
- Logging.ingest.warn("Error connecting to ingestion API: "+ioe.getMessage(),ioe);
+ Logging.ingest.warn("Error indexing into Solr: "+ioe.getMessage(),ioe);
activityStart = new Long(fullStartTime);
- activityCode = "-1";
+ activityCode = "SOCKET TIMEOUT";
activityDetails = ioe.getMessage();
throw ioe;
@@ -1451,18 +817,18 @@ public class HttpPoster
// since we really don't know what happened for sure.
// Record the attempt
- activityCode = "-103";
- activityDetails = "Presuming an ingestion rejection: "+ioe.getMessage();
+ activityCode = "SOCKET CLOSE";
+ activityDetails = "Presuming an indexing rejection: "+ioe.getMessage();
rval = false;
return;
}
// Record the attempt
- activityCode = "-1";
+ activityCode = "IO ERROR";
activityDetails = ioe.getMessage();
// Log the error
- Logging.ingest.warn("Error communicating with Ingestion API: "+ioe.getMessage(),ioe);
+ Logging.ingest.warn("Error indexing into Solr: "+ioe.getMessage(),ioe);
throw ioe;
}
@@ -1540,117 +906,33 @@ public class HttpPoster
// Open a socket to ingest, and to the response stream to get the post result
try
{
- // Set up the socket, and the (optional) secure socket.
- Socket socket = createSocket(responseRetries);
- try
- {
- InputStream in = socket.getInputStream();
- try
- {
- OutputStream out = socket.getOutputStream();
- try
- {
- byte[] requestBytes = ("<delete><id>"+xmlEncode(documentURI)+"</id></delete>").getBytes("UTF-8");
- long startTime = System.currentTimeMillis();
- byte[] tmp = ("POST " + postRemoveAction + " HTTP/1.0\r\n").getBytes("ASCII");
- out.write(tmp, 0, tmp.length);
-
- // Set all the headers
- writeCredentials(out);
- tmp = ("Content-Length: "+Integer.toString(requestBytes.length)+"\r\n").getBytes("ASCII");
- out.write(tmp, 0, tmp.length);
- tmp = ("Content-Type: text/xml; charset=UTF-8\r\n\r\n").getBytes("ASCII");
- out.write(tmp, 0, tmp.length);
-
- out.write(requestBytes);
-
- out.flush();
-
- if (Logging.ingest.isDebugEnabled())
- Logging.ingest.debug("Delete posted");
-
- CodeDetails cd;
- try
- {
- cd = getResponse(in);
- }
- catch (ServiceInterruption si)
- {
- activityStart = new Long(startTime);
- activityCode = "-2";
- activityDetails = si.getMessage();
- throw si;
- }
-
- activityStart = new Long(startTime);
- activityCode = cd.getCode();
- activityDetails = cd.getDetails();
-
- int codeValue = cd.getCodeValue();
-
- if (codeValue < 0)
- throw new ManifoldCFException("Http protocol error");
-
- // 200 means we got an xml document back
- if (codeValue == 200)
- {
- // Look at response XML
- cd.parseRemovalResponse();
- return;
- }
-
- // We ignore everything in the range from 400-500 now
- if (codeValue == 401)
- throw new ManifoldCFException("Bad credentials for ingestion",ManifoldCFException.SETUP_ERROR);
-
- if (codeValue >= 400 && codeValue < 500)
- return;
-
- // Anything else means the document didn't delete. Throw the error.
- throw new ManifoldCFException("Error deleting document: '"+cd.getDescription()+"'");
- }
- finally
- {
- out.close();
- }
- }
- finally
- {
- in.close();
- }
- }
- finally
- {
- try
- {
- socket.close();
- }
- catch (InterruptedIOException e)
- {
- throw e;
- }
- catch (IOException e)
- {
- Logging.ingest.debug("Error closing socket: "+e.getMessage(),e);
- // Do NOT rethrow
- }
- }
- }
- catch (UnsupportedEncodingException ioe)
- {
- throw new ManifoldCFException("Fatal ingestion error: "+ioe.getMessage(),ioe);
+ solrServer.deleteById(documentURI);
+
+ // Success
+ activityStart = new Long(fullStartTime);
+ activityCode = "OK";
+ activityDetails = null;
+ return;
}
catch (InterruptedIOException ioe)
{
return;
}
+ catch (SolrServerException e)
+ {
+ activityStart = new Long(fullStartTime);
+ activityCode = "FAILED";
+ activityDetails = e.getMessage();
+
+ throw e;
+ }
catch (IOException ioe)
{
// Log the error
- Logging.ingest.warn("Error communicating with Ingestion API: "+ioe.getMessage(),ioe);
+ Logging.ingest.warn("Error deleting document: "+ioe.getMessage(),ioe);
activityStart = new Long(fullStartTime);
- activityCode = "-1";
+ activityCode = "IO ERROR";
activityDetails = ioe.getMessage();
throw ioe;
@@ -1703,94 +985,19 @@ public class HttpPoster
{
try
{
- // Do the operation!
- // Open a socket to update request handler, and to the response stream to get the post result
try
{
- // Set up the socket, and the (optional) secure socket.
- Socket socket = createSocket(responseRetries);
- try
- {
- InputStream in = socket.getInputStream();
- try
- {
- OutputStream out = socket.getOutputStream();
- try
- {
- // Create the output stream to GTS
- byte[] tmp = ("GET " + postUpdateAction + "?commit=true HTTP/1.0\r\n").getBytes("ASCII");
- out.write(tmp, 0, tmp.length);
-
- writeCredentials(out);
-
- tmp = ("Content-Length: 0\r\n\r\n").getBytes("ASCII");
- out.write(tmp, 0, tmp.length);
-
- if (Logging.ingest.isDebugEnabled())
- Logging.ingest.debug("Commit request posted");
-
- out.flush();
-
- CodeDetails cd = getResponse(in);
-
- int codeValue = cd.getCodeValue();
- if (codeValue < 0)
- throw new ManifoldCFException("Http protocol error");
-
- // 200 means everything went OK
- if (codeValue == 200)
- {
- cd.parseCommitResponse();
- return;
- }
-
- // We ignore everything in the range from 400-500 now
- if (codeValue == 401)
- throw new ManifoldCFException("Bad credentials for commit request",ManifoldCFException.SETUP_ERROR);
-
- // Anything else means the info request failed.
- throw new ManifoldCFException("Error connecting to update request API: '"+cd.getDescription()+"'");
- }
- finally
- {
- out.close();
- }
- }
- finally
- {
- in.close();
- }
- }
- finally
- {
- try
- {
- socket.close();
- }
- catch (InterruptedIOException e)
- {
- throw e;
- }
- catch (IOException e)
- {
- Logging.ingest.debug("Error closing socket: "+e.getMessage(),e);
- // Do NOT rethrow
- }
- }
- }
- catch (UnsupportedEncodingException ioe)
- {
- throw new ManifoldCFException("Fatal commit error: "+ioe.getMessage(),ioe);
+ // Do the operation!
+ solrServer.commit();
}
catch (InterruptedIOException ioe)
{
- // Exit the thread.
return;
}
catch (IOException ioe)
{
// Log the error
- Logging.ingest.warn("Error communicating with update request handler: "+ioe.getMessage(),ioe);
+ Logging.ingest.warn("Error committing: "+ioe.getMessage(),ioe);
throw ioe;
}
}
@@ -1828,83 +1035,11 @@ public class HttpPoster
try
{
// Do the operation!
- // Open a socket to ingest, and to the response stream to get the post result
try
{
- // Set up the socket, and the (optional) secure socket.
- Socket socket = createSocket(responseRetries);
- try
- {
- InputStream in = socket.getInputStream();
- try
- {
- OutputStream out = socket.getOutputStream();
- try
- {
- // Create the output stream to GTS
- byte[] tmp = ("GET " + postStatusAction + " HTTP/1.0\r\n").getBytes("ASCII");
- out.write(tmp, 0, tmp.length);
-
- writeCredentials(out);
-
- tmp = ("Content-Length: 0\r\n\r\n").getBytes("ASCII");
- out.write(tmp, 0, tmp.length);
-
- if (Logging.ingest.isDebugEnabled())
- Logging.ingest.debug("Status request posted");
-
- out.flush();
-
- CodeDetails cd = getResponse(in);
-
- int codeValue = cd.getCodeValue();
- if (codeValue < 0)
- throw new ManifoldCFException("Http protocol error");
-
- // 200 means everything went OK
- if (codeValue == 200)
- {
- cd.parseStatusResponse();
- return;
- }
-
- // We ignore everything in the range from 400-500 now
- if (codeValue == 401)
- throw new ManifoldCFException("Bad credentials for ingestion",ManifoldCFException.SETUP_ERROR);
-
- // Anything else means the info request failed.
- throw new ManifoldCFException("Error connecting to ingestion API: '"+cd.getDescription()+"'");
- }
- finally
- {
- out.close();
- }
- }
- finally
- {
- in.close();
- }
- }
- finally
- {
- try
- {
- socket.close();
- }
- catch (InterruptedIOException e)
- {
- throw e;
- }
- catch (IOException e)
- {
- Logging.ingest.debug("Error closing socket: "+e.getMessage(),e);
- // Do NOT rethrow
- }
- }
- }
- catch (UnsupportedEncodingException ioe)
- {
- throw new ManifoldCFException("Fatal ingestion error: "+ioe.getMessage(),ioe);
+ // MHL to check status via SolrJ
+ if (false)
+ throw new IOException("blah");
}
catch (InterruptedIOException ioe)
{
@@ -1914,7 +1049,7 @@ public class HttpPoster
catch (IOException ioe)
{
// Log the error
- Logging.ingest.warn("Error communicating with Ingestion API: "+ioe.getMessage(),ioe);
+ Logging.ingest.warn("Error checking status: "+ioe.getMessage(),ioe);
throw ioe;
}
}
@@ -1930,184 +1065,41 @@ public class HttpPoster
}
}
- /** Code+details paper object */
- protected static class CodeDetails
+ /** Class for importing documents into Solr via SolrJ
+ */
+ protected static class RepositoryDocumentStream extends ContentStreamBase
{
- protected String code;
- protected int codeValue;
- protected String details;
- protected String res;
- protected XMLDoc returnDoc;
- protected String rawString;
-
- public CodeDetails(String res, XMLDoc returnDoc, String rawString)
- {
- this.res = res;
- this.returnDoc = returnDoc;
- this.rawString = rawString;
- codeValue = -100;
- code = "-100";
- details = "Http response was improperly formed";
-
- int firstSpace = res.indexOf(" ");
- if (firstSpace != -1)
- {
- int secondSpace = res.indexOf(" ", firstSpace + 1);
- if (secondSpace != -1)
- {
- code = res.substring(firstSpace + 1, secondSpace);
- details = res.substring(secondSpace+1).trim();
- try
- {
- codeValue = (int)(new Double(code).doubleValue());
- if (codeValue == 200)
- details = null;
- }
- catch (NumberFormatException e)
- {
- // Fall through and leave codeValue unaltered
- }
- }
- }
- }
-
- public String getCode()
- {
- return code;
- }
-
- public int getCodeValue()
- {
- return codeValue;
- }
-
- public String getDetails()
- {
- return details;
- }
-
- public XMLDoc getReturnDoc()
- {
- return returnDoc;
- }
-
- public String getDescription()
- throws ManifoldCFException
+ protected InputStream is;
+ protected long length;
+
+ public RepositoryDocumentStream(InputStream is, long length)
{
- return res + "\r\n" + ((returnDoc!=null)?returnDoc.getXML():((rawString!=null)?rawString:""));
+ this.is = is;
+ this.length = length;
}
-
- public void parseIngestionResponse()
- throws ManifoldCFException
+
+ @Override
+ public InputStream getStream() throws IOException
{
- // Look at response XML
- String statusValue = "unknown";
- XMLDoc doc = getReturnDoc();
- if (doc != null)
- {
- if (Logging.ingest.isDebugEnabled())
- Logging.ingest.debug("SOLR: Saw ingestion response document '"+doc.getXML()+"'");
- //Object root = doc.getRoot();
- ArrayList list = doc.processPath("*",null);
- int k = 0;
- while (k < list.size())
- {
- Object listNode = list.get(k++);
- if (doc.getNodeName(listNode).equals("response"))
- {
- ArrayList list2 = doc.processPath("*",listNode);
- int q = 0;
- while (q < list2.size())
- {
- Object respNode = list2.get(q++);
- if (doc.getNodeName(respNode).equals("lst"))
- {
- String lstName = doc.getValue(respNode,"name");
- if (lstName.equals("responseHeader"))
- {
- ArrayList list3 = doc.processPath("*",respNode);
- int z = 0;
- while (z < list3.size())
- {
- Object headerNode = list3.get(z++);
- if (doc.getNodeName(headerNode).equals("int"))
- {
- String value = doc.getValue(headerNode,"name");
- if (value.equals("status"))
- {
- statusValue = doc.getData(headerNode).trim();
- }
- }
- }
- }
- }
- }
- }
- }
- if (statusValue.equals("0"))
- return;
-
- throw new ManifoldCFException("Ingestion returned error: "+statusValue);
- }
- else
- throw new ManifoldCFException("XML parsing error on response");
+ return is;
}
-
- public void parseRemovalResponse()
- throws ManifoldCFException
+
+ @Override
+ public Reader getReader() throws IOException
{
- parseIngestionResponse();
+ // MHL - I think this should always return null, but we need to confirm
+ //String charset = getCharsetFromContentType( this.getContentType() );
+ //return charset == null ? new InputStreamReader( this.getStream() ) : new InputStreamReader( this.getStream(), charset );
+ return null;
}
- public void parseCommitResponse()
- throws ManifoldCFException
+ @Override
+ public String getContentType()
{
- parseIngestionResponse();
+ return "application/octet-stream";
}
-
- public void parseStatusResponse()
- throws ManifoldCFException
- {
- // Look at response XML
- String statusValue = "unknown";
- XMLDoc doc = getReturnDoc();
- if (doc != null)
- {
- if (Logging.ingest.isDebugEnabled())
- Logging.ingest.debug("SOLR: Saw status response document '"+doc.getXML()+"'");
- //Object root = doc.getRoot();
- ArrayList list = doc.processPath("*",null);
- int k = 0;
- while (k < list.size())
- {
- Object listNode = list.get(k++);
- if (doc.getNodeName(listNode).equals("response"))
- {
- ArrayList list2 = doc.processPath("*",listNode);
- int q = 0;
- while (q < list2.size())
- {
- Object respNode = list2.get(q++);
- if (doc.getNodeName(respNode).equals("str"))
- {
- String value = doc.getValue(respNode,"name");
- if (value.equals("status"))
- {
- statusValue = doc.getData(respNode).trim();
- }
- }
- }
- }
- }
- if (statusValue.equals("OK"))
- return;
- throw new ManifoldCFException("Status error: "+statusValue);
- }
- else
- throw new ManifoldCFException("XML parsing error on response");
- }
}
-
+
}
Modified: manifoldcf/branches/CONNECTORS-594/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/SolrConfig.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-594/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/SolrConfig.java?rev=1427229&r1=1427228&r2=1427229&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-594/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/SolrConfig.java (original)
+++ manifoldcf/branches/CONNECTORS-594/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/SolrConfig.java Mon Dec 31 19:46:25 2012
@@ -27,12 +27,47 @@ public class SolrConfig
// Configuration parameters
+ /** Solr type */
+ public static final String PARAM_SOLR_TYPE = "Solr type";
+ /** Type: Standard */
+ public static final String SOLR_TYPE_STANDARD = "standard";
+ /** Type: Solr Cloud */
+ public static final String SOLR_TYPE_SOLRCLOUD = "solrcloud";
+
+ // SolrCloud zookeeper parameters
+
+ // Zookeeper hosts, as nodes
+ /** Zookeeper node */
+ public static final String NODE_ZOOKEEPER = "zookeeper";
+ /** Zookeeper hostname */
+ public static final String ATTR_HOST = "host";
+ /** Zookeeper port */
+ public static final String ATTR_PORT = "port";
+
+ /** ZooKeeper client timeout */
+ public static final String PARAM_ZOOKEEPER_CLIENT_TIMEOUT = "ZooKeeper client timeout";
+ /** ZooKeeper connect timeout */
+ public static final String PARAM_ZOOKEEPER_CONNECT_TIMEOUT = "ZooKeeper connect timeout";
+ /** Collection name */
+ public static final String PARAM_COLLECTION = "Collection";
+
+ // General indexing parameters
+
/** Protocol */
public static final String PARAM_PROTOCOL = "Server protocol";
+ /** Protocol: http */
+ public static final String PROTOCOL_TYPE_HTTP = "http";
+ /** Protocol: https */
+ public static final String PROTOCOL_TYPE_HTTPS = "https";
+
/** Server name */
public static final String PARAM_SERVER = "Server name";
/** Port */
public static final String PARAM_PORT = "Server port";
+ /** Connection timeout */
+ public static final String PARAM_CONNECTION_TIMEOUT = "Connection timeout";
+ /** Socket timeout */
+ public static final String PARAM_SOCKET_TIMEOUT = "Socket timeout";
/** Webapp */
public static final String PARAM_WEBAPPNAME = "Server web application";
/** Core */