You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/06/05 04:25:26 UTC

svn commit: r1489686 - in /manifoldcf/trunk/connectors: dropbox/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/dropbox/ googledrive/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/googledrive/

Author: kwright
Date: Wed Jun  5 02:25:25 2013
New Revision: 1489686

URL: http://svn.apache.org/r1489686
Log:
More revisions to the code structure for Dropbox and GoogleDrive connectors, to establish best practices having to do with background thread structures and input streams.

Modified:
    manifoldcf/trunk/connectors/dropbox/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/dropbox/DropboxRepositoryConnector.java
    manifoldcf/trunk/connectors/googledrive/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/googledrive/GoogleDriveRepositoryConnector.java

Modified: manifoldcf/trunk/connectors/dropbox/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/dropbox/DropboxRepositoryConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/dropbox/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/dropbox/DropboxRepositoryConnector.java?rev=1489686&r1=1489685&r2=1489686&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/dropbox/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/dropbox/DropboxRepositoryConnector.java (original)
+++ manifoldcf/trunk/connectors/dropbox/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/dropbox/DropboxRepositoryConnector.java Wed Jun  5 02:25:25 2013
@@ -787,17 +787,7 @@ public class DropboxRepositoryConnector 
         GetObjectThread objt = new GetObjectThread(nodeId);
         try {
           objt.start();
-          objt.join();
-          Throwable thr = objt.getException();
-          if (thr != null) {
-            if (thr instanceof DropboxException) {
-              throw (DropboxException) thr;
-            } else if (thr instanceof RuntimeException) {
-              throw (RuntimeException) thr;
-            } else {
-              throw (Error) thr;
-            }
-          }
+          objt.finishUp();
         } catch (InterruptedException e) {
           objt.interrupt();
           throw new ManifoldCFException("Interrupted: " + e.getMessage(), e,
@@ -826,68 +816,82 @@ public class DropboxRepositoryConnector 
 
         } else {
           // its a file
-          doLog = true;
-          
-          // content ingestion
-          RepositoryDocument rd = new RepositoryDocument();
+          if (!scanOnly[i]) {
+            doLog = true;
+            
+            // content ingestion
+            RepositoryDocument rd = new RepositoryDocument();
 
-          // Length in bytes
-          long fileLength = dropboxObject.bytes;
-          //documentURI
-          String documentURI = dropboxObject.path;
-
-          if (dropboxObject.path != null)
-            rd.setFileName(dropboxObject.path);
-          if (dropboxObject.mimeType != null)
-            rd.setMimeType(dropboxObject.mimeType);
-          if (dropboxObject.modified != null)
-            rd.setModifiedDate(com.dropbox.client2.RESTUtility.parseDate(dropboxObject.modified));
-          // There doesn't appear to be a created date...
+            // Length in bytes
+            long fileLength = dropboxObject.bytes;
+            //documentURI
+            String documentURI = dropboxObject.path;
+
+            if (dropboxObject.path != null)
+              rd.setFileName(dropboxObject.path);
+            if (dropboxObject.mimeType != null)
+              rd.setMimeType(dropboxObject.mimeType);
+            if (dropboxObject.modified != null)
+              rd.setModifiedDate(com.dropbox.client2.RESTUtility.parseDate(dropboxObject.modified));
+            // There doesn't appear to be a created date...
+              
+            rd.addField("Modified", dropboxObject.modified);
+            rd.addField("Size", dropboxObject.size);
+            rd.addField("Path", dropboxObject.path);
+            rd.addField("Root", dropboxObject.root);
+            rd.addField("ClientMtime", dropboxObject.clientMtime);
+            rd.addField("mimeType", dropboxObject.mimeType);
+            rd.addField("rev", dropboxObject.rev);
             
-          rd.addField("Modified", dropboxObject.modified);
-          rd.addField("Size", dropboxObject.size);
-          rd.addField("Path", dropboxObject.path);
-          rd.addField("Root", dropboxObject.root);
-          rd.addField("ClientMtime", dropboxObject.clientMtime);
-          rd.addField("mimeType", dropboxObject.mimeType);
-          rd.addField("rev", dropboxObject.rev);
-          
-          getSession();
-          BackgroundStreamThread t = new BackgroundStreamThread(nodeId);
-          try {
-            t.start();
-            InputStream is = t.getSafeInputStream();
+            getSession();
+            BackgroundStreamThread t = new BackgroundStreamThread(nodeId);
             try {
-              rd.setBinary(is, fileLength);
-              activities.ingestDocument(nodeId, version, documentURI, rd);
-            } finally {
-              is.close();
+              t.start();
+              try {
+                InputStream is = t.getSafeInputStream();
+                try {
+                  rd.setBinary(is, fileLength);
+                  activities.ingestDocument(nodeId, version, documentURI, rd);
+                } finally {
+                  is.close();
+                }
+              } finally {
+                // Abort, no matter what
+                t.abort();
+              }
+
+              // This does a join
+              t.finishUp();
+
+              // No errors.  Record the fact that we made it.
+              errorCode = "OK";
+              fileSize = new Long(fileLength);
+            } catch (InterruptedException e) {
+              // We were interrupted out of the join, most likely.  Before we abandon the thread,
+              // send a courtesy interrupt.
+              t.interrupt();
+              throw new ManifoldCFException("Interrupted: " + e.getMessage(), e,
+                ManifoldCFException.INTERRUPTED);
+            } catch (InterruptedIOException e) {
+              t.interrupt();
+              throw new ManifoldCFException("Interrupted: " + e.getMessage(), e,
+                ManifoldCFException.INTERRUPTED);
+            } catch (IOException e) {
+              errorCode = "IO ERROR";
+              errorDesc = e.getMessage();
+              handleIOException(e);
+            } catch (DropboxException e) {
+              Logging.connectors.warn("DROPBOX: Error getting stream: " + e.getMessage(), e);
+              errorCode = "DROPBOX ERROR";
+              errorDesc = e.getMessage();
+              handleDropboxException(e);
             }
-            t.join();
-            errorCode = "OK";
-            fileSize = new Long(fileLength);
-          } catch (InterruptedException e) {
-            t.interrupt();
-            throw new ManifoldCFException("Interrupted: " + e.getMessage(), e,
-              ManifoldCFException.INTERRUPTED);
-          } catch (InterruptedIOException e) {
-            t.interrupt();
-            throw new ManifoldCFException("Interrupted: " + e.getMessage(), e,
-              ManifoldCFException.INTERRUPTED);
-          } catch (IOException e) {
-            errorCode = "IO ERROR";
-            errorDesc = e.getMessage();
-            handleIOException(e);
-          } catch (DropboxException e) {
-            Logging.connectors.warn("DROPBOX: Error getting stream: " + e.getMessage(), e);
-            errorCode = "DROPBOX ERROR";
-            errorDesc = e.getMessage();
-            handleDropboxException(e);
           }
         }
       } finally {
-        activities.recordActivity(new Long(startTime), ACTIVITY_READ,
-          fileSize, nodeId, errorCode, errorDesc, null);
+        if (doLog)
+          activities.recordActivity(new Long(startTime), ACTIVITY_READ,
+            fileSize, nodeId, errorCode, errorDesc, null);
       }
     }
   }
@@ -913,6 +917,22 @@ public class DropboxRepositoryConnector 
       }
     }
 
+    public void finishUp()
+      throws InterruptedException, DropboxException {
+      join();
+      Throwable thr = exception;
+      if (thr != null) {
+        if (thr instanceof DropboxException)
+          throw (DropboxException) thr;
+        else if (thr instanceof RuntimeException)
+          throw (RuntimeException) thr;
+        else if (thr instanceof Error)
+          throw (Error) thr;
+        else
+          throw new RuntimeException("Unhandled exception of type: "+thr.getClass().getName(),thr);
+      }
+    }
+
     public DropboxAPI.Entry getResponse() {
       return response;
     }
@@ -997,9 +1017,10 @@ public class DropboxRepositoryConnector 
     }
     
     public void finishUp()
-      throws InterruptedException
+      throws InterruptedException, IOException, DropboxException
     {
       join();
+      checkException(responseException);
     }
     
     protected synchronized void checkException(Throwable exception)
@@ -1023,124 +1044,6 @@ public class DropboxRepositoryConnector 
 
   }
 
-  /** This input stream wraps a background transaction thread, so that
-  * the thread is ended when the stream is closed.
-  */
-  private static class BackgroundInputStream extends InputStream {
-    
-    private BackgroundStreamThread streamThread = null;
-    private InputStream xThreadInputStream = null;
-
-    public BackgroundInputStream(BackgroundStreamThread streamThread, InputStream xThreadInputStream)
-    {
-      this.streamThread = streamThread;
-      this.xThreadInputStream = xThreadInputStream;
-    }
-    
-    @Override
-    public int available()
-      throws IOException
-    {
-      if (xThreadInputStream != null)
-        return xThreadInputStream.available();
-      return super.available();
-    }
-    
-    @Override
-    public void close()
-      throws IOException
-    {
-      try
-      {
-        if (xThreadInputStream != null)
-        {
-          xThreadInputStream.close();
-          xThreadInputStream = null;
-        }
-      }
-      finally
-      {
-        if (streamThread != null)
-        {
-          streamThread.abort();
-          try
-          {
-            streamThread.finishUp();
-          }
-          catch (InterruptedException e)
-          {
-            throw new InterruptedIOException(e.getMessage());
-          }
-          streamThread = null;
-        }
-      }
-    }
-    
-    @Override
-    public void mark(int readlimit)
-    {
-      if (xThreadInputStream != null)
-        xThreadInputStream.mark(readlimit);
-      else
-        super.mark(readlimit);
-    }
-    
-    @Override
-    public void reset()
-      throws IOException
-    {
-      if (xThreadInputStream != null)
-        xThreadInputStream.reset();
-      else
-        super.reset();
-    }
-    
-    @Override
-    public boolean markSupported()
-    {
-      if (xThreadInputStream != null)
-        return xThreadInputStream.markSupported();
-      return super.markSupported();
-    }
-    
-    @Override
-    public long skip(long n)
-      throws IOException
-    {
-      if (xThreadInputStream != null)
-        return xThreadInputStream.skip(n);
-      return super.skip(n);
-    }
-    
-    @Override
-    public int read(byte[] b, int off, int len)
-      throws IOException
-    {
-      if (xThreadInputStream != null)
-        return xThreadInputStream.read(b,off,len);
-      return super.read(b,off,len);
-    }
-
-    @Override
-    public int read(byte[] b)
-      throws IOException
-    {
-      if (xThreadInputStream != null)
-        return xThreadInputStream.read(b);
-      return super.read(b);
-    }
-    
-    @Override
-    public int read()
-      throws IOException
-    {
-      if (xThreadInputStream != null)
-        return xThreadInputStream.read();
-      return -1;
-    }
-    
-  }
-
   /**
    * The short version of getDocumentVersions. Get document versions given an
    * array of document identifiers. This method is called for EVERY document
@@ -1167,17 +1070,7 @@ public class DropboxRepositoryConnector 
       GetObjectThread objt = new GetObjectThread(documentIdentifiers[i]);
       try {
         objt.start();
-        objt.join();
-        Throwable thr = objt.getException();
-        if (thr != null) {
-          if (thr instanceof DropboxException) {
-            throw (DropboxException) thr;
-          } else if (thr instanceof RuntimeException) {
-            throw (RuntimeException) thr;
-          } else {
-            throw (Error) thr;
-          }
-        }
+        objt.finishUp();
       } catch (InterruptedException e) {
         objt.interrupt();
         throw new ManifoldCFException("Interrupted: " + e.getMessage(), e,

Modified: manifoldcf/trunk/connectors/googledrive/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/googledrive/GoogleDriveRepositoryConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/googledrive/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/googledrive/GoogleDriveRepositoryConnector.java?rev=1489686&r1=1489685&r2=1489686&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/googledrive/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/googledrive/GoogleDriveRepositoryConnector.java (original)
+++ manifoldcf/trunk/connectors/googledrive/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/googledrive/GoogleDriveRepositoryConnector.java Wed Jun  5 02:25:25 2013
@@ -721,29 +721,22 @@ public class GoogleDriveRepositoryConnec
     GetSeedsThread t = new GetSeedsThread(googleDriveQuery, seedBuffer);
     try {
       t.start();
-      
-      // Pick up the paths, and add them to the activities, before we join with the child thread.
-      while (true) {
-        // The only kind of exceptions this can throw are going to shut the process down.
-        String docPath = seedBuffer.fetch();
-        if (docPath ==  null)
-          break;
-        // Add the pageID to the queue
-        activities.addSeedDocument(docPath);
+      try {
+        // Pick up the paths, and add them to the activities, before we join with the child thread.
+        while (true) {
+          // The only kind of exceptions this can throw are going to shut the process down.
+          String docPath = seedBuffer.fetch();
+          if (docPath ==  null)
+            break;
+          // Add the pageID to the queue
+          activities.addSeedDocument(docPath);
+        }
+      } finally {
+        t.abort();
       }
 
-      t.join();
+      t.finishUp();
 
-      Throwable thr = t.getException();
-      if (thr != null) {
-        if (thr instanceof IOException) {
-          throw (IOException) thr;
-        } else if (thr instanceof RuntimeException) {
-          throw (RuntimeException) thr;
-        } else {
-          throw (Error) thr;
-        }
-      }
     } catch (InterruptedException e) {
       t.interrupt();
       throw new ManifoldCFException("Interrupted: " + e.getMessage(), e,
@@ -758,9 +751,6 @@ public class GoogleDriveRepositoryConnec
     } catch (IOException e) {
       Logging.connectors.warn("GOOGLEDRIVE: Error adding seed documents: " + e.getMessage(), e);
       handleIOException(e);
-    } finally {
-      // Make SURE buffer is dead, otherwise child thread may well hang waiting on it
-      seedBuffer.abandon();
     }
   }
   
@@ -786,8 +776,24 @@ public class GoogleDriveRepositoryConnec
       }
     }
 
-    public Throwable getException() {
-      return exception;
+    public void abort() {
+      seedBuffer.abandon();
+    }
+    
+    public void finishUp()
+      throws InterruptedException, IOException {
+      join();
+      Throwable thr = exception;
+      if (thr != null) {
+        if (thr instanceof IOException)
+          throw (IOException) thr;
+        else if (thr instanceof RuntimeException)
+          throw (RuntimeException) thr;
+        else if (thr instanceof Error)
+          throw (Error) thr;
+        else
+          throw new RuntimeException("Unhandled exception of type: "+thr.getClass().getName(),thr);
+      }
     }
   }
 
@@ -885,146 +891,138 @@ public class GoogleDriveRepositoryConnec
         
     for (int i = 0; i < documentIdentifiers.length; i++) {
       long startTime = System.currentTimeMillis();
+      String errorCode = "FAILED";
+      String errorDesc = StringUtils.EMPTY;
+      Long fileSize = null;
+      boolean doLog = false;
       String nodeId = documentIdentifiers[i];
       String version = versions[i];
-      if (Logging.connectors.isDebugEnabled()) {
-        Logging.connectors.debug("GOOGLEDRIVE: Processing document identifier '"
-            + nodeId + "'");
-      }
-
-      File googleFile = getObject(nodeId);
-
-      String errorCode = "OK";
-      String errorDesc = StringUtils.EMPTY;
-      if (googleFile.containsKey("explicitlyTrashed") && googleFile.getExplicitlyTrashed()) {
-        //its deleted, move on
-        continue;
-      }
+      
+      try {
+        if (Logging.connectors.isDebugEnabled()) {
+          Logging.connectors.debug("GOOGLEDRIVE: Processing document identifier '"
+              + nodeId + "'");
+        }
 
+        File googleFile = getObject(nodeId);
 
-      if (Logging.connectors.isDebugEnabled()) {
-        Logging.connectors.debug("GOOGLEDRIVE: have this file:\t" + googleFile.getTitle());
-      }
+        if (googleFile.containsKey("explicitlyTrashed") && googleFile.getExplicitlyTrashed()) {
+          //its deleted, move on
+          continue;
+        }
 
-      if ("application/vnd.google-apps.folder".equals(googleFile.getMimeType())) {
-        //if directory add its children
 
         if (Logging.connectors.isDebugEnabled()) {
-          Logging.connectors.debug("GOOGLEDRIVE: its a directory");
+          Logging.connectors.debug("GOOGLEDRIVE: have this file:\t" + googleFile.getTitle());
         }
 
-        // adding all the children + subdirs for a folder
+        if ("application/vnd.google-apps.folder".equals(googleFile.getMimeType())) {
+          //if directory add its children
 
-        getSession();
-        XThreadStringBuffer childBuffer = new XThreadStringBuffer();
-        GetChildrenThread t = new GetChildrenThread(nodeId, childBuffer);
-        try {
-          t.start();
-          
-          // Pick up the paths, and add them to the activities, before we join with the child thread.
-          while (true) {
-            // The only kind of exceptions this can throw are going to shut the process down.
-            String child = childBuffer.fetch();
-            if (child ==  null)
-              break;
-            // Add the pageID to the queue
-            activities.addDocumentReference(child, nodeId, RELATIONSHIP_CHILD);
+          if (Logging.connectors.isDebugEnabled()) {
+            Logging.connectors.debug("GOOGLEDRIVE: its a directory");
           }
 
-          t.join();
+          // adding all the children + subdirs for a folder
 
-          Throwable thr = t.getException();
-          if (thr != null) {
-            if (thr instanceof IOException) {
-              throw (IOException) thr;
-            } else if (thr instanceof RuntimeException) {
-              throw (RuntimeException) thr;
-            } else {
-              throw (Error) thr;
+          getSession();
+          XThreadStringBuffer childBuffer = new XThreadStringBuffer();
+          GetChildrenThread t = new GetChildrenThread(nodeId, childBuffer);
+          try {
+            t.start();
+            try {
+              // Pick up the paths, and add them to the activities, before we join with the child thread.
+              while (true) {
+                // The only kind of exceptions this can throw are going to shut the process down.
+                String child = childBuffer.fetch();
+                if (child ==  null)
+                  break;
+                // Add the pageID to the queue
+                activities.addDocumentReference(child, nodeId, RELATIONSHIP_CHILD);
+              }
+            } finally {
+              t.abort();
             }
-          }
-        } catch (InterruptedException e) {
-          t.interrupt();
-          throw new ManifoldCFException("Interrupted: " + e.getMessage(), e,
-            ManifoldCFException.INTERRUPTED);
-        } catch (java.net.SocketTimeoutException e) {
-          Logging.connectors.warn("GOOGLEDRIVE: Socket timeout adding child documents: " + e.getMessage(), e);
-          handleIOException(e);
-        } catch (InterruptedIOException e) {
-          t.interrupt();
-          throw new ManifoldCFException("Interrupted: " + e.getMessage(), e,
-            ManifoldCFException.INTERRUPTED);
-        } catch (IOException e) {
-          Logging.connectors.warn("GOOGLEDRIVE: Error adding child documents: " + e.getMessage(), e);
-          handleIOException(e);
-        } finally {
-          // Make SURE buffer is dead, otherwise child thread may well hang waiting on it
-          childBuffer.abandon();
-        }
-
-      } else {
-        // its a file
-        if (!scanOnly[i]) {
-          if (Logging.connectors.isDebugEnabled()) {
-            Logging.connectors.debug("GOOGLEDRIVE: its a file");
+            t.finishUp();
+          } catch (InterruptedException e) {
+            t.interrupt();
+            throw new ManifoldCFException("Interrupted: " + e.getMessage(), e,
+              ManifoldCFException.INTERRUPTED);
+          } catch (java.net.SocketTimeoutException e) {
+            Logging.connectors.warn("GOOGLEDRIVE: Socket timeout adding child documents: " + e.getMessage(), e);
+            handleIOException(e);
+          } catch (InterruptedIOException e) {
+            t.interrupt();
+            throw new ManifoldCFException("Interrupted: " + e.getMessage(), e,
+              ManifoldCFException.INTERRUPTED);
+          } catch (IOException e) {
+            Logging.connectors.warn("GOOGLEDRIVE: Error adding child documents: " + e.getMessage(), e);
+            handleIOException(e);
           }
 
-          // We always direct to the PDF
-          String documentURI = getUrl(googleFile, "application/pdf");
+        } else {
+          // its a file
+          if (!scanOnly[i]) {
+            doLog = true;
 
-          // Get the file length
-          long fileLength = googleFile.getFileSize();
+            if (Logging.connectors.isDebugEnabled()) {
+              Logging.connectors.debug("GOOGLEDRIVE: its a file");
+            }
 
-          //otherwise process
-          RepositoryDocument rd = new RepositoryDocument();
+            // We always direct to the PDF
+            String documentURI = getUrl(googleFile, "application/pdf");
 
-          String mimeType = googleFile.getMimeType();
-          DateTime createdDate = googleFile.getCreatedDate();
-          DateTime modifiedDate = googleFile.getModifiedDate();
-          String extension = googleFile.getFileExtension();
-          String title = googleFile.getTitle();
-          
-          if (mimeType != null)
-            rd.setMimeType(mimeType);
-          if (createdDate != null)
-            rd.setCreatedDate(new Date(createdDate.getValue()));
-          if (modifiedDate != null)
-            rd.setModifiedDate(new Date(modifiedDate.getValue()));
-          if (extension != null)
-          {
-            if (title == null)
-              title = "";
-            rd.setFileName(title + "." + extension);
-          }
+            // Get the file length
+            long fileLength = googleFile.getFileSize();
 
-          for (Entry<String, Object> entry : googleFile.entrySet()) {
-            rd.addField(entry.getKey(), entry.getValue().toString());
-          }
+            //otherwise process
+            RepositoryDocument rd = new RepositoryDocument();
 
-          XThreadInputStream is = new XThreadInputStream();
-          try {
-            rd.setBinary(is, fileLength);
+            String mimeType = googleFile.getMimeType();
+            DateTime createdDate = googleFile.getCreatedDate();
+            DateTime modifiedDate = googleFile.getModifiedDate();
+            String extension = googleFile.getFileExtension();
+            String title = googleFile.getTitle();
             
+            if (mimeType != null)
+              rd.setMimeType(mimeType);
+            if (createdDate != null)
+              rd.setCreatedDate(new Date(createdDate.getValue()));
+            if (modifiedDate != null)
+              rd.setModifiedDate(new Date(modifiedDate.getValue()));
+            if (extension != null)
+            {
+              if (title == null)
+                title = "";
+              rd.setFileName(title + "." + extension);
+            }
+
+            for (Entry<String, Object> entry : googleFile.entrySet()) {
+              rd.addField(entry.getKey(), entry.getValue().toString());
+            }
+
             // Fire up the document reading thread
-            DocumentReadingThread t = new DocumentReadingThread(documentURI, is);
+            DocumentReadingThread t = new DocumentReadingThread(documentURI);
             try {
               t.start();
-              
-              // Can only index while background thread is running!
-              activities.ingestDocument(nodeId, version, documentURI, rd);
-
-              t.join();
-
-              Throwable thr = t.getException();
-              if (thr != null) {
-                if (thr instanceof IOException) {
-                  throw (IOException) thr;
-                } else if (thr instanceof RuntimeException) {
-                  throw (RuntimeException) thr;
-                } else {
-                  throw (Error) thr;
+              try {
+                InputStream is = t.getSafeInputStream();
+                try {
+                  // Can only index while background thread is running!
+                  rd.setBinary(is, fileLength);
+                  activities.ingestDocument(nodeId, version, documentURI, rd);
+                } finally {
+                  is.close();
                 }
+              } finally {
+                t.abort();
               }
+                
+              t.finishUp();
+
+              // No errors.  Record the fact that we made it.
+              errorCode = "OK";
+              fileSize = new Long(fileLength);
             } catch (InterruptedException e) {
               t.interrupt();
               throw new ManifoldCFException("Interrupted: " + e.getMessage(), e,
@@ -1037,37 +1035,17 @@ public class GoogleDriveRepositoryConnec
               throw new ManifoldCFException("Interrupted: " + e.getMessage(), e,
                 ManifoldCFException.INTERRUPTED);
             } catch (IOException e) {
-              Logging.connectors.warn("GOOGLEDRIVE: Error reading document: " + e.getMessage(), e);
-              handleIOException(e);
-            }
-          } finally {
-            try {
-              is.close();
-            } catch (java.net.SocketTimeoutException e) {
               errorCode = "IO ERROR";
               errorDesc = e.getMessage();
-              Logging.connectors.warn(
-                  "GOOGLEDRIVE: SocketTimeoutException closing file input stream: "
-                  + e.getMessage(), e);
-            } catch (InterruptedIOException e) {
-              errorCode = "Interrupted error";
-              errorDesc = e.getMessage();
-              throw new ManifoldCFException(e.getMessage(), e,
-                  ManifoldCFException.INTERRUPTED);
-            } catch (IOException e) {
-              errorCode = "IO ERROR";
-              errorDesc = e.getMessage();
-              Logging.connectors.warn(
-                  "GOOGLEDRIVE: IOException closing file input stream: "
-                  + e.getMessage(), e);
+              Logging.connectors.warn("GOOGLEDRIVE: Error reading document: " + e.getMessage(), e);
+              handleIOException(e);
             }
-
-            activities.recordActivity(new Long(startTime), ACTIVITY_READ,
-                fileLength, nodeId, errorCode, errorDesc, null);
           }
         }
-
-
+      } finally {
+        if (doLog)
+          activities.recordActivity(new Long(startTime), ACTIVITY_READ,
+            fileSize, nodeId, errorCode, errorDesc, null);
       }
     }
   }
@@ -1078,10 +1056,10 @@ public class GoogleDriveRepositoryConnec
     protected final String fileURL;
     protected final XThreadInputStream stream;
     
-    public DocumentReadingThread(String fileURL, XThreadInputStream stream) {
+    public DocumentReadingThread(String fileURL) {
       super();
       this.fileURL = fileURL;
-      this.stream = stream;
+      this.stream = new XThreadInputStream();
       setDaemon(true);
     }
 
@@ -1094,9 +1072,36 @@ public class GoogleDriveRepositoryConnec
       }
     }
 
-    public Throwable getException() {
-      return exception;
+    public InputStream getSafeInputStream() {
+      return stream;
+    }
+    
+    public void abort()
+    {
+      // This will be called during the finally
+      // block in the case where all is well (and
+      // the stream completed) and in the case where
+      // there were exceptions.
+      stream.abort();
     }
+    
+    public void finishUp()
+      throws InterruptedException, IOException
+    {
+      join();
+      Throwable thr = exception;
+      if (thr != null) {
+        if (thr instanceof IOException)
+          throw (IOException) thr;
+        else if (thr instanceof RuntimeException)
+          throw (RuntimeException) thr;
+        else if (thr instanceof Error)
+          throw (Error) thr;
+        else
+          throw new RuntimeException("Unhandled exception of type: "+thr.getClass().getName(),thr);
+      }
+    }
+
   }
 
   /** Get the URL of a file in google land.
@@ -1131,8 +1136,24 @@ public class GoogleDriveRepositoryConnec
       }
     }
 
-    public Throwable getException() {
-      return exception;
+    public void abort() {
+      childBuffer.abandon();
+    }
+    
+    public void finishUp()
+      throws InterruptedException, IOException {
+      join();
+      Throwable thr = exception;
+      if (thr != null) {
+        if (thr instanceof IOException)
+          throw (IOException) thr;
+        else if (thr instanceof RuntimeException)
+          throw (RuntimeException) thr;
+        else if (thr instanceof Error)
+          throw (Error) thr;
+        else
+          throw new RuntimeException("Unhandled exception of type: "+thr.getClass().getName(),thr);
+      }
     }
   }