You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/06/27 17:12:31 UTC

svn commit: r1606118 - in /manifoldcf/trunk: connectors/elasticsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/elasticsearch/ framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/

Author: kwright
Date: Fri Jun 27 15:12:31 2014
New Revision: 1606118

URL: http://svn.apache.org/r1606118
Log:
Part of the fix for CONNECTORS-985.  Fix elastic search.

Modified:
    manifoldcf/trunk/connectors/elasticsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/elasticsearch/ElasticSearchAction.java
    manifoldcf/trunk/connectors/elasticsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/elasticsearch/ElasticSearchConnection.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobNotificationThread.java

Modified: manifoldcf/trunk/connectors/elasticsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/elasticsearch/ElasticSearchAction.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/elasticsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/elasticsearch/ElasticSearchAction.java?rev=1606118&r1=1606117&r2=1606118&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/elasticsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/elasticsearch/ElasticSearchAction.java (original)
+++ manifoldcf/trunk/connectors/elasticsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/elasticsearch/ElasticSearchAction.java Fri Jun 27 15:12:31 2014
@@ -22,6 +22,8 @@ package org.apache.manifoldcf.agents.out
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.HttpClient;
 
+import java.io.IOException;
+
 import org.apache.manifoldcf.core.interfaces.ManifoldCFException;
 import org.apache.manifoldcf.agents.interfaces.ServiceInterruption;
 import org.apache.manifoldcf.crawler.system.Logging;
@@ -52,4 +54,21 @@ public class ElasticSearchAction extends
     setResult(Result.ERROR, error);
     Logging.connectors.warn("ES: Commit failed: "+getResponse());
   }
+  
+  @Override
+  protected void handleIOException(IOException e)
+    throws ManifoldCFException, ServiceInterruption {
+    // We want a quicker failure here!!
+    if (e instanceof java.io.InterruptedIOException && !(e instanceof java.net.SocketTimeoutException))
+      throw new ManifoldCFException(e.getMessage(),ManifoldCFException.INTERRUPTED);
+    setResult(Result.ERROR, e.getMessage());
+    long currentTime = System.currentTimeMillis();
+    // One notification attempt, then we're done.
+    throw new ServiceInterruption("IO exception: "+e.getMessage(),e,
+        currentTime + 60000L,
+        currentTime + 1L * 60L * 60000L,
+        1,
+        false);
+  }
+
 }

Modified: manifoldcf/trunk/connectors/elasticsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/elasticsearch/ElasticSearchConnection.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/elasticsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/elasticsearch/ElasticSearchConnection.java?rev=1606118&r1=1606117&r2=1606118&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/elasticsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/elasticsearch/ElasticSearchConnection.java (original)
+++ manifoldcf/trunk/connectors/elasticsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/elasticsearch/ElasticSearchConnection.java Fri Jun 27 15:12:31 2014
@@ -149,6 +149,26 @@ public class ElasticSearchConnection
         exception = e;
       }
     }
+
+    public void finishUp()
+      throws HttpException, IOException, InterruptedException
+    {
+      join();
+      Throwable t = exception;
+      if (t != null)
+      {
+        if (t instanceof HttpException)
+          throw (HttpException)t;
+        else if (t instanceof IOException)
+          throw (IOException)t;
+        else if (t instanceof RuntimeException)
+          throw (RuntimeException)t;
+        else if (t instanceof Error)
+          throw (Error)t;
+        else
+          throw new RuntimeException("Unexpected exception thrown: "+t.getMessage(),t);
+      }
+    }
     
     public int getResultCode()
     {
@@ -178,22 +198,7 @@ public class ElasticSearchConnection
       ct.start();
       try
       {
-        ct.join();
-        Throwable t = ct.getException();
-        if (t != null)
-        {
-          if (t instanceof HttpException)
-            throw (HttpException)t;
-          else if (t instanceof IOException)
-            throw (IOException)t;
-          else if (t instanceof RuntimeException)
-            throw (RuntimeException)t;
-          else if (t instanceof Error)
-            throw (Error)t;
-          else
-            throw new RuntimeException("Unexpected exception thrown: "+t.getMessage(),t);
-        }
-        
+        ct.finishUp();
         response = ct.getResponse();
         return handleResultCode(ct.getResultCode(), response);
       }
@@ -248,14 +253,16 @@ public class ElasticSearchConnection
     throw new ManifoldCFException("Unexpected HTTP result code: "+code+": "+response);
   }
 
-  private void handleHttpException(HttpException e)
+  protected void handleHttpException(HttpException e)
     throws ManifoldCFException, ServiceInterruption {
     setResult(Result.ERROR, e.getMessage());
     throw new ManifoldCFException(e);
   }
   
-  private void handleIOException(IOException e)
+  protected void handleIOException(IOException e)
     throws ManifoldCFException, ServiceInterruption {
+    if (e instanceof java.io.InterruptedIOException && !(e instanceof java.net.SocketTimeoutException))
+      throw new ManifoldCFException(e.getMessage(),ManifoldCFException.INTERRUPTED);
     setResult(Result.ERROR, e.getMessage());
     long currentTime = System.currentTimeMillis();
     // All IO exceptions are treated as service interruptions, retried for an hour

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobNotificationThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobNotificationThread.java?rev=1606118&r1=1606117&r2=1606118&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobNotificationThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobNotificationThread.java Fri Jun 27 15:12:31 2014
@@ -363,6 +363,7 @@ public class JobNotificationThread exten
                       // If either we are going to be requeuing beyond the fail time, OR
                       // the number of retries available has hit 0, THEN we treat this
                       // as either an "ignore" or a hard error.
+                      ///System.out.println("jsr.getFailTime()="+jsr.getFailTime()+"; e.getRetryTime()="+e.getRetryTime()+"; jsr.getFailRetryCount()="+jsr.getFailRetryCount());
                       if (!e.jobInactiveAbort() && (jsr.getFailTime() != -1L && jsr.getFailTime() < e.getRetryTime() ||
                         jsr.getFailRetryCount() == 0))
                       {
@@ -370,9 +371,11 @@ public class JobNotificationThread exten
                         if (e.isAbortOnFail())
                         {
                           // Note the error in the job, and transition to inactive state
-                          String message = e.jobInactiveAbort()?"":"Repeated service interruptions during notification"+((e.getCause()!=null)?": "+e.getCause().getMessage():"");
-                          if (jobManager.errorAbort(jobID,message) && message.length() > 0)
+                          String message = e.jobInactiveAbort()?"":"Repeated service interruptions during delete notification"+((e.getCause()!=null)?": "+e.getCause().getMessage():"");
+                          if (message.length() > 0)
                             Logging.jobs.error(message,e.getCause());
+                          // Can't abort a delete!!
+                          jobManager.removeJob(jobID);
                           jsr.noteStarted();
                         }
                         else
@@ -385,6 +388,7 @@ public class JobNotificationThread exten
                       else
                       {
                         // Reset the job to the READYFORDELETENOTIFY state, updating the failtime and failcount fields
+                        //System.out.println("Retrying... e.getFailTime()="+e.getFailTime()+"; e.getFailRetryCount()="+e.getFailRetryCount());
                         jobManager.retryDeleteNotification(jsr,e.getFailTime(),e.getFailRetryCount());
                         jsr.noteStarted();
                       }