You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by mi...@apache.org on 2013/06/25 10:23:03 UTC

svn commit: r1496377 - in /manifoldcf/branches/CONNECTORS-728/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf: agents/output/hdfs/ crawler/connectors/hdfs/

Author: minoru
Date: Tue Jun 25 08:23:03 2013
New Revision: 1496377

URL: http://svn.apache.org/r1496377
Log: (empty)

Added:
    manifoldcf/branches/CONNECTORS-728/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSSession.java   (with props)
Modified:
    manifoldcf/branches/CONNECTORS-728/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputConnector.java
    manifoldcf/branches/CONNECTORS-728/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSRepositoryConnector.java

Modified: manifoldcf/branches/CONNECTORS-728/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-728/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputConnector.java?rev=1496377&r1=1496376&r2=1496377&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-728/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputConnector.java (original)
+++ manifoldcf/branches/CONNECTORS-728/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputConnector.java Tue Jun 25 08:23:03 2013
@@ -44,6 +44,7 @@ import org.apache.manifoldcf.agents.inte
 import org.apache.manifoldcf.agents.interfaces.OutputSpecification;
 import org.apache.manifoldcf.agents.interfaces.RepositoryDocument;
 import org.apache.manifoldcf.agents.interfaces.ServiceInterruption;
+import org.apache.manifoldcf.agents.system.Logging;
 import org.apache.manifoldcf.agents.output.BaseOutputConnector;
 import org.apache.manifoldcf.agents.output.hdfs.HDFSOutputParam.ParameterEnum;
 import org.apache.manifoldcf.core.interfaces.ConfigParams;
@@ -134,11 +135,11 @@ public class HDFSOutputConnector extends
     try {
         fileSystem = FileSystem.get(new URI(nameNode), config, user);
     } catch (URISyntaxException e) {
-      Logging.connectors.warn("HDFS: Node name error: " + e.getMessage(), e);
+      Logging.agents.warn("HDFS: Node name error: " + e.getMessage(), e);
     } catch (IOException e) {
-      Logging.connectors.warn("HDFS: File system error: " + e.getMessage(), e);
+      Logging.agents.warn("HDFS: File system error: " + e.getMessage(), e);
     } catch (InterruptedException e) {
-      Logging.connectors.warn("HDFS: File system error: " + e.getMessage(), e);
+      Logging.agents.warn("HDFS: File system error: " + e.getMessage(), e);
     }
   }
 

Modified: manifoldcf/branches/CONNECTORS-728/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSRepositoryConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-728/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSRepositoryConnector.java?rev=1496377&r1=1496376&r2=1496377&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-728/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSRepositoryConnector.java (original)
+++ manifoldcf/branches/CONNECTORS-728/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSRepositoryConnector.java Tue Jun 25 08:23:03 2013
@@ -18,16 +18,24 @@
 */
 package org.apache.manifoldcf.crawler.connectors.hdfs;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.manifoldcf.agents.interfaces.RepositoryDocument;
+import org.apache.manifoldcf.agents.interfaces.ServiceInterruption;
 import org.apache.manifoldcf.core.interfaces.*;
-import org.apache.manifoldcf.agents.interfaces.*;
+import org.apache.manifoldcf.crawler.connectors.hdfs.HDFSSession;
+import org.apache.manifoldcf.crawler.connectors.hdfs.Messages;
 import org.apache.manifoldcf.crawler.interfaces.*;
 import org.apache.manifoldcf.crawler.system.Logging;
+import org.apache.manifoldcf.core.common.XThreadInputStream;
+import org.apache.manifoldcf.core.common.XThreadStringBuffer;
 import org.apache.manifoldcf.core.extmimemap.ExtensionMimeMap;
+
+import java.security.GeneralSecurityException;
 import java.util.*;
 import java.io.*;
 import java.net.URI;
@@ -55,8 +63,16 @@ public class HDFSRepositoryConnector ext
   // Local data
   // protected File rootDirectory = null;
 
+  // Nodes and attributes
+  //private static final String JOB_STARTPOINT_NODE_TYPE = "startpoint";
+  //private static final String JOB_PATH_ATTRIBUTE = "path";
+
+  protected String nameNode = null;
+  protected String user = null;
   protected Configuration config = null;
-  protected FileSystem fileSystem = null;
+  protected HDFSSession session = null;
+  protected long lastSessionFetch = -1L;
+  protected static final long timeToRelease = 300000L;
 
   /*
    * Constructor.
@@ -65,16 +81,62 @@ public class HDFSRepositoryConnector ext
   {
   }
 
-  /* (non-Javadoc)
+  /** Tell the world what model this connector uses for getDocumentIdentifiers().
+   * This must return a model value as specified above.
+   *@return the model type value.
+   */
+  @Override
+  public int getConnectorModel()
+  {
+    return MODEL_CHAINED_ADD_CHANGE;
+  }
+
+/** Return the list of relationship types that this connector recognizes.
+   *@return the list.
+   */
+  @Override
+  public String[] getRelationshipTypes()
+  {
+    return new String[]{RELATIONSHIP_CHILD};
+  }
+
+  /** List the activities we might report on.
+   */
+  @Override
+  public String[] getActivitiesList()
+  {
+    return activitiesList;
+  }
+
+  /** For any given document, list the bins that it is a member of.
+   */
+  @Override
+  public String[] getBinNames(String documentIdentifier)
+  {
+    return new String[]{"HDFS"};
+  }
+
+  /**
+   * Get the maximum number of documents to amalgamate together into one
+   * batch, for this connector.
+   *
+   * @return the maximum number. 0 indicates "unlimited".
+   */
+  @Override
+  public int getMaxDocumentRequest() {
+    return 1;
+  }
+
+/* (non-Javadoc)
    * @see org.apache.manifoldcf.core.connector.BaseConnector#connect(org.apache.manifoldcf.core.interfaces.ConfigParams)
    */
   @Override
   public void connect(ConfigParams configParams) {
     super.connect(configParams);
 
-    String nameNode = configParams.getParameter("namenode");
+    nameNode = configParams.getParameter("namenode");
     
-    String user = configParams.getParameter("user");
+    user = configParams.getParameter("user");
     
     /*
      * make Configuration
@@ -87,170 +149,248 @@ public class HDFSRepositoryConnector ext
     } finally {
       Thread.currentThread().setContextClassLoader(ocl);
     }
-    
-    /*
-     * get connection to HDFS
-     */
-    try {
-      fileSystem = FileSystem.get(new URI(nameNode), config, user);
-	} catch (URISyntaxException e) {
-      Logging.connectors.warn("HDFS: Node name error: " + e.getMessage(), e);
-	} catch (IOException e) {
-      Logging.connectors.warn("HDFS: File system error: " + e.getMessage(), e);
-	} catch (InterruptedException e) {
-      Logging.connectors.warn("HDFS: File system error: " + e.getMessage(), e);
-	}
   }
 
-  /* (non-Javadoc)
+/* (non-Javadoc)
    * @see org.apache.manifoldcf.core.connector.BaseConnector#disconnect()
    */
   @Override
   public void disconnect() throws ManifoldCFException {
-    try {
-      fileSystem.close();
-    } catch(IOException ex) {
-      throw new ManifoldCFException(ex);
+    if (session != null) {
+      session.close();
+      session = null;
+      lastSessionFetch = -1L;
     }
+  
     config.clear();
+    config = null;
+    user = null;
+    nameNode = null;
     super.disconnect();
   }
 
-  /** Tell the world what model this connector uses for getDocumentIdentifiers().
-   * This must return a model value as specified above.
-   *@return the model type value.
+/**
+   * Set up a session
    */
-  @Override
-  public int getConnectorModel()
-  {
-    return MODEL_CHAINED_ADD_CHANGE;
+  protected void getSession() throws ManifoldCFException, ServiceInterruption {
+    if (session == null) {
+      if (StringUtils.isEmpty(nameNode)) {
+        throw new ManifoldCFException("Parameter namenode required but not set");
+      }
+      if (Logging.connectors.isDebugEnabled()) {
+        Logging.connectors.debug("HDFS: NameNode = '" + nameNode + "'");
+      }
+
+      if (StringUtils.isEmpty(user)) {
+        throw new ManifoldCFException("Parameter user required but not set");
+      }
+      if (Logging.connectors.isDebugEnabled()) {
+        Logging.connectors.debug("HDFS: User = '" + user + "'");
+      }
+      
+      long currentTime;
+      GetSessionThread t = new GetSessionThread();
+      try {
+        t.start();
+        t.join();
+        Throwable thr = t.getException();
+        if (thr != null) {
+          if (thr instanceof IOException) {
+            throw (IOException) thr;
+          } else if (thr instanceof GeneralSecurityException) {
+            throw (GeneralSecurityException) thr;
+          } else {
+            throw (Error) thr;
+          }
+        }
+      } catch (InterruptedException e) {
+        t.interrupt();
+        throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+      } catch (java.net.SocketTimeoutException e) {
+        Logging.connectors.warn("HDFS: Socket timeout: " + e.getMessage(), e);
+        handleIOException(e);
+      } catch (InterruptedIOException e) {
+        t.interrupt();
+        throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+      } catch (GeneralSecurityException e) {
+        Logging.connectors.error("HDFS: " +  "General security error initializing transport: " + e.getMessage(), e);
+        handleGeneralSecurityException(e);
+      } catch (IOException e) {
+        Logging.connectors.warn("HDFS: IO error: " + e.getMessage(), e);
+        handleIOException(e);
+      }
+    }
+    lastSessionFetch = System.currentTimeMillis();
   }
 
-  /** Return the list of relationship types that this connector recognizes.
-   *@return the list.
+/**
+   * Test the connection. Returns a string describing the connection
+   * integrity.
+   *
+   * @return the connection's status as a displayable string.
    */
   @Override
-  public String[] getRelationshipTypes()
-  {
-    return new String[]{RELATIONSHIP_CHILD};
+  public String check() throws ManifoldCFException {
+    try {
+      checkConnection();
+      return super.check();
+    } catch (ServiceInterruption e) {
+      return "Connection temporarily failed: " + e.getMessage();
+    } catch (ManifoldCFException e) {
+      return "Connection failed: " + e.getMessage();
+    }
   }
 
-  /** List the activities we might report on.
+  /**
+   * @throws ManifoldCFException
+   * @throws ServiceInterruption
    */
-  @Override
-  public String[] getActivitiesList()
-  {
-    return activitiesList;
+  protected void checkConnection() throws ManifoldCFException, ServiceInterruption {
+    getSession();
+    CheckConnectionThread t = new CheckConnectionThread();
+    try {
+      t.start();
+      t.join();
+      Throwable thr = t.getException();
+      if (thr != null) {
+        if (thr instanceof IOException) {
+          throw (IOException) thr;
+        } else if (thr instanceof RuntimeException) {
+          throw (RuntimeException) thr;
+        } else {
+          throw (Error) thr;
+        }
+      }
+      return;
+    } catch (InterruptedException e) {
+      t.interrupt();
+      throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+    } catch (java.net.SocketTimeoutException e) {
+      Logging.connectors.warn("HDFS: Socket timeout: " + e.getMessage(), e);
+      handleIOException(e);
+    } catch (InterruptedIOException e) {
+      t.interrupt();
+      throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+    } catch (IOException e) {
+      Logging.connectors.warn("HDFS: Error checking repository: " + e.getMessage(), e);
+      handleIOException(e);
+    }
   }
 
-  /** For any given document, list the bins that it is a member of.
+  /**
+   * @throws ManifoldCFException
    */
   @Override
-  public String[] getBinNames(String documentIdentifier)
-  {
-    /*
-    // Note: This code is for testing, so we can see how documents behave when they are in various kinds of bin situations.
-    // The testing model is that there are documents belonging to "SLOW", to "FAST", or both to "SLOW" and "FAST" bins.
-    // The connector chooses which bins to assign a document to based on the identifier (which is the document's path), so
-    // this is something that should NOT be duplicated by other connector implementers.
-    if (documentIdentifier.indexOf("/BOTH/") != -1 || (documentIdentifier.indexOf("/SLOW/") != -1 && documentIdentifier.indexOf("/FAST/") != -1))
-      return new String[]{"SLOW","FAST"};
-    if (documentIdentifier.indexOf("/SLOW/") != -1)
-      return new String[]{"SLOW"};
-    if (documentIdentifier.indexOf("/FAST/") != -1)
-      return new String[]{"FAST"};
-     */
-    return new String[]{""};
+  public void poll() throws ManifoldCFException {
+    if (lastSessionFetch == -1L) {
+      return;
+    }
+
+    long currentTime = System.currentTimeMillis();
+    if (currentTime >= lastSessionFetch + timeToRelease) {
+      if (session != null) {
+        session.close();
+        session = null;
+        lastSessionFetch = -1L;
+      }
+    }
   }
 
-  /** Convert a document identifier to a URI.  The URI is the URI that will be the unique key from
-  * the search index, and will be presented to the user as part of the search results.
-  *@param filePath is the document filePath.
-  *@param repositoryPath is the document repositoryPath.
-  *@return the document uri.
-  */
-  protected String convertToURI(String documentIdentifier, String[] repositoryPaths)
-    throws ManifoldCFException
-  {
-    //
-    // Note well:  This MUST be a legal URI!!!
-    try
-    {
-      String path = new Path(documentIdentifier).toString();
-      for (String repositoryPath : repositoryPaths) {
-        if (path.startsWith(repositoryPath)) {
-          StringBuffer sb = new StringBuffer();
-          path = path.replaceFirst(repositoryPath, "");
-          if (path.startsWith("/")) {
-            path = path.replaceFirst("/", "");
-          }
-          String[] tmp = path.split("/", 3);
-          String scheme = "";
-          String host = "";
-          String other = "";
-          try {
-            scheme = tmp[0];
-          } catch (ArrayIndexOutOfBoundsException e) {
-            scheme = "hdfs";
-          }
-          try {
-            host = tmp[1];
-          } catch (ArrayIndexOutOfBoundsException e) {
-            host = "localhost:9000";
-          }
+  /**
+   * Queue "seed" documents. Seed documents are the starting places for
+   * crawling activity. Documents are seeded when this method calls
+   * appropriate methods in the passed in ISeedingActivity object.
+   *
+   * This method can choose to find repository changes that happen only during
+   * the specified time interval. The seeds recorded by this method will be
+   * viewed by the framework based on what the getConnectorModel() method
+   * returns.
+   *
+   * It is not a big problem if the connector chooses to create more seeds
+   * than are strictly necessary; it is merely a question of overall work
+   * required.
+   *
+   * The times passed to this method may be interpreted for greatest
+   * efficiency. The time ranges any given job uses with this connector will
+   * not overlap, but will proceed starting at 0 and going to the "current
+   * time", each time the job is run. For continuous crawling jobs, this
+   * method will be called once, when the job starts, and at various periodic
+   * intervals as the job executes.
+   *
+   * When a job's specification is changed, the framework automatically resets
+   * the seeding start time to 0. The seeding start time may also be set to 0
+   * on each job run, depending on the connector model returned by
+   * getConnectorModel().
+   *
+   * Note that it is always ok to send MORE documents rather than less to this
+   * method.
+   *
+   * @param activities is the interface this method should use to perform
+   * whatever framework actions are desired.
+   * @param spec is a document specification (that comes from the job).
+   * @param startTime is the beginning of the time range to consider,
+   * inclusive.
+   * @param endTime is the end of the time range to consider, exclusive.
+   * @param jobMode is an integer describing how the job is being run, whether
+   * continuous or once-only.
+   */
+  @Override
+  public void addSeedDocuments(ISeedingActivity activities,
+      DocumentSpecification spec, long startTime, long endTime, int jobMode)
+      throws ManifoldCFException, ServiceInterruption {
+
+    String path = StringUtils.EMPTY;
+    int i = 0;
+    while (i < spec.getChildCount()) {
+      SpecificationNode sn = spec.getChild(i);
+      if (sn.getType().equals("startpoint")) {
+        path = sn.getAttributeValue("path");
+  
+        getSession();
+        GetSeedsThread t = new GetSeedsThread(path);
+        try {
+          t.start();
+          boolean wasInterrupted = false;
           try {
-            other = "/" + tmp[2];
-          } catch (ArrayIndexOutOfBoundsException e) {
-            other = "/";
+            XThreadStringBuffer seedBuffer = t.getBuffer();
+
+            // Pick up the paths, and add them to the activities, before we join with the child thread.
+            while (true) {
+              // The only kind of exceptions this can throw are going to shut the process down.
+              String docPath = seedBuffer.fetch();
+              if (docPath ==  null) {
+                break;
+              }
+              // Add the pageID to the queue
+              activities.addSeedDocument(docPath);
+            }
+          } catch (InterruptedException e) {
+            wasInterrupted = true;
+            throw e;
+          } catch (ManifoldCFException e) {
+            if (e.getErrorCode() == ManifoldCFException.INTERRUPTED) {
+              wasInterrupted = true;
+            }
+            throw e;
+          } finally {
+            if (!wasInterrupted) {
+              t.finishUp();
+            }
           }
-          return new URI(scheme + "://" + host + other).toURL().toString();
+        } catch (InterruptedException e) {
+          t.interrupt();
+          throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+        } catch (java.net.SocketTimeoutException e) {
+          Logging.connectors.warn("HDFS: Socket timeout adding seed documents: " + e.getMessage(), e);
+          handleIOException(e);
+        } catch (IOException e) {
+          Logging.connectors.warn("HDFS: Error adding seed documents: " + e.getMessage(), e);
+          handleIOException(e);
         }
       }
-      return convertToURI(documentIdentifier);
-    }
-    catch (URISyntaxException e)
-    {
-      throw new ManifoldCFException("Bad url",e);
-    }
-    catch (IOException e)
-    {
-      throw new ManifoldCFException("Bad url",e);
+      i++;
     }
   }
-  
-  /** Convert a document identifier to a URI.  The URI is the URI that will be the unique key from
-  * the search index, and will be presented to the user as part of the search results.
-  *@param documentIdentifier is the document identifier.
-  *@return the document uri.
-  */
-  protected String convertToURI(String documentIdentifier)
-    throws ManifoldCFException
-  {
-    //
-    // Note well:  This MUST be a legal URI!!!
-    return new Path(documentIdentifier).toUri().toString();
-  }
-
-
-  /** Given a document specification, get either a list of starting document identifiers (seeds),
-  * or a list of changes (deltas), depending on whether this is a "crawled" connector or not.
-  * These document identifiers will be loaded into the job's queue at the beginning of the
-  * job's execution.
-  * This method can return changes only (because it is provided a time range).  For full
-  * recrawls, the start time is always zero.
-  * Note that it is always ok to return MORE documents rather than less with this method.
-  *@param spec is a document specification (that comes from the job).
-  *@param startTime is the beginning of the time range to consider, inclusive.
-  *@param endTime is the end of the time range to consider, exclusive.
-  *@return the stream of local document identifiers that should be added to the queue.
-  */
-  @Override
-  public IDocumentIdentifierStream getDocumentIdentifiers(DocumentSpecification spec, long startTime, long endTime)
-    throws ManifoldCFException
-  {
-    return new IdentifierStream(spec);
-  }
-
 
   /** Get document versions given an array of document identifiers.
   * This method is called for EVERY document that is considered. It is therefore important to perform
@@ -289,61 +429,48 @@ public class HDFSRepositoryConnector ext
     }
 
     String[] rval = new String[documentIdentifiers.length];
-    try
-    {
-      i = 0;
-      while (i < rval.length)
-      {
-        Path path = new Path(documentIdentifiers[i]);
-        if (fileSystem.exists(path))
-        {
-          if (fileSystem.getFileStatus(path).isDir())
-          {
-            // It's a directory.  The version ID will be the
-            // last modified date.
-            long lastModified = fileSystem.getFileStatus(path).getModificationTime();
+    for (i = 0; i < rval.length; i++) {
+      getSession();
+      GetObjectThread objt = new GetObjectThread(documentIdentifiers[i]);
+      try {
+        objt.start();
+        objt.finishUp();
+      } catch (InterruptedException e) {
+        objt.interrupt();
+        throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+      }
+      
+      try {
+        Path path = objt.getResponse();
+        if (session.getFileSystem().exists(path)) {
+          if (session.getFileSystem().getFileStatus(path).isDir()) {
+            long lastModified = session.getFileSystem().getFileStatus(path).getModificationTime();
             rval[i] = new Long(lastModified).toString();
-
-            // Signal that we don't have any versioning.
-            // rval[i] = "";
-          }
-          else
-          {
-            // It's a file
-            long fileLength = fileSystem.getFileStatus(path).getLen();
-            if (activities.checkLengthIndexable(fileLength))
-            {
-              // Get the file's modified date.
-              long lastModified = fileSystem.getFileStatus(path).getModificationTime();
+          } else {
+            long fileLength = session.getFileSystem().getFileStatus(path).getLen();
+            if (activities.checkLengthIndexable(fileLength)) {
+              long lastModified = session.getFileSystem().getFileStatus(path).getModificationTime();
               StringBuilder sb = new StringBuilder();
-              if (filePathToUri)
-              {
+              if (filePathToUri) {
                 sb.append("+");
-              }
-              else
-              {
+              } else {
                 sb.append("-");
               }
               sb.append(new Long(lastModified).toString()).append(":").append(new Long(fileLength).toString());
               rval[i] = sb.toString();
-            }
-            else
-            {
+            } else {
               rval[i] = null;
             }
           }
-        }
-        else
-        {
+        } else {
           rval[i] = null;
         }
-        i++;
+      } catch (IOException e) {
+        objt.interrupt();
+        throw new ManifoldCFException(e);
       }
     }
-    catch (IOException e)
-    {
-      throw new ManifoldCFException(e);
-    }
+    
     return rval;
   }
 
@@ -371,8 +498,8 @@ public class HDFSRepositoryConnector ext
         String version = versions[i];
         String documentIdentifier = documentIdentifiers[i];
         Path path = new Path(documentIdentifier);
-        FileStatus fileStatus = fileSystem.getFileStatus(path);
-        if (fileSystem.exists(path))
+        FileStatus fileStatus = session.getFileSystem().getFileStatus(path);
+        if (session.getFileSystem().exists(path))
         {
           if (fileStatus.isDir())
           {
@@ -385,7 +512,7 @@ public class HDFSRepositoryConnector ext
             {
               try
               {
-                FileStatus[] fileStatuses = fileSystem.listStatus(path);
+                FileStatus[] fileStatuses = session.getFileSystem().listStatus(path);
                 if (fileStatuses != null)
                 {
                   int j = 0;
@@ -393,7 +520,7 @@ public class HDFSRepositoryConnector ext
                   {
                     FileStatus fs = fileStatuses[j++];
                     String canonicalPath = fs.getPath().toString();
-                    if (checkInclude(fileSystem.getUri().toString(),fs,canonicalPath,spec))
+                    if (checkInclude(session.getFileSystem().getUri().toString(),fs,canonicalPath,spec))
                       activities.addDocumentReference(canonicalPath,documentIdentifier,RELATIONSHIP_CHILD);
                   }
                 }
@@ -416,7 +543,7 @@ public class HDFSRepositoryConnector ext
             {
               // We've already avoided queuing documents that we don't want, based on file specifications.
               // We still need to check based on file data.
-              if (checkIngest(fileSystem.getUri().toString(),fileStatus,spec))
+              if (checkIngest(session.getFileSystem().getUri().toString(),fileStatus,spec))
               {
                 int j = 0;
 
@@ -431,7 +558,7 @@ public class HDFSRepositoryConnector ext
                   if (sn.getType().equals("startpoint"))
                   {
                     if (sn.getAttributeValue("path").length() > 0) {
-                      repositoryPaths.add(fileSystem.getUri().resolve(sn.getAttributeValue("path")).toString());
+                      repositoryPaths.add(session.getFileSystem().getUri().resolve(sn.getAttributeValue("path")).toString());
                     }
                   }
                 }
@@ -454,7 +581,7 @@ public class HDFSRepositoryConnector ext
                   // Ingest the document.
                   try
                   {
-                    FSDataInputStream is = fileSystem.open(path);
+                    FSDataInputStream is = session.getFileSystem().open(path);
                     try
                     {
                       long fileBytes = fileStatus.getLen();
@@ -504,18 +631,6 @@ public class HDFSRepositoryConnector ext
     }
   }
 
-  /** Map an extension to a mime type */
-  protected static String mapExtensionToMimeType(String fileName)
-  {
-    int slashIndex = fileName.lastIndexOf("/");
-    if (slashIndex != -1)
-      fileName = fileName.substring(slashIndex+1);
-    int dotIndex = fileName.lastIndexOf(".");
-    if (dotIndex == -1)
-      return null;
-    return ExtensionMimeMap.mapToMimeType(fileName.substring(dotIndex+1).toLowerCase(java.util.Locale.ROOT));
-  }
-
   // UI support methods.
   //
   // These support methods come in two varieties.  The first bunch is involved in setting up connection configuration information.  The second bunch
@@ -1206,7 +1321,87 @@ public class HDFSRepositoryConnector ext
 
   // Protected static methods
 
-  /** Check if a file or directory should be included, given a document specification.
+  /** Convert a document identifier to a URI.  The URI is the URI that will be the unique key from
+  * the search index, and will be presented to the user as part of the search results.
+  *@param filePath is the document filePath.
+  *@param repositoryPath is the document repositoryPath.
+  *@return the document uri.
+  */
+  protected String convertToURI(String documentIdentifier, String[] repositoryPaths)
+    throws ManifoldCFException
+  {
+    //
+    // Note well:  This MUST be a legal URI!!!
+    try
+    {
+      String path = new Path(documentIdentifier).toString();
+      for (String repositoryPath : repositoryPaths) {
+        if (path.startsWith(repositoryPath)) {
+          StringBuffer sb = new StringBuffer();
+          path = path.replaceFirst(repositoryPath, "");
+          if (path.startsWith("/")) {
+            path = path.replaceFirst("/", "");
+          }
+          String[] tmp = path.split("/", 3);
+          String scheme = "";
+          String host = "";
+          String other = "";
+          try {
+            scheme = tmp[0];
+          } catch (ArrayIndexOutOfBoundsException e) {
+            scheme = "hdfs";
+          }
+          try {
+            host = tmp[1];
+          } catch (ArrayIndexOutOfBoundsException e) {
+            host = "localhost:9000";
+          }
+          try {
+            other = "/" + tmp[2];
+          } catch (ArrayIndexOutOfBoundsException e) {
+            other = "/";
+          }
+          return new URI(scheme + "://" + host + other).toURL().toString();
+        }
+      }
+      return convertToURI(documentIdentifier);
+    }
+    catch (URISyntaxException e)
+    {
+      throw new ManifoldCFException("Bad url",e);
+    }
+    catch (IOException e)
+    {
+      throw new ManifoldCFException("Bad url",e);
+    }
+  }
+
+/** Convert a document identifier to a URI.  The URI is the URI that will be the unique key from
+  * the search index, and will be presented to the user as part of the search results.
+  *@param documentIdentifier is the document identifier.
+  *@return the document uri.
+  */
+  protected String convertToURI(String documentIdentifier)
+    throws ManifoldCFException
+  {
+    //
+    // Note well:  This MUST be a legal URI!!!
+    return new Path(documentIdentifier).toUri().toString();
+  }
+
+/** Map an extension to a mime type */
+  protected static String mapExtensionToMimeType(String fileName)
+  {
+    int slashIndex = fileName.lastIndexOf("/");
+    if (slashIndex != -1)
+      fileName = fileName.substring(slashIndex+1);
+    int dotIndex = fileName.lastIndexOf(".");
+    if (dotIndex == -1)
+      return null;
+    return ExtensionMimeMap.mapToMimeType(fileName.substring(dotIndex+1).toLowerCase(java.util.Locale.ROOT));
+  }
+
+/** Check if a file or directory should be included, given a document specification.
   *@param fileName is the canonical file name.
   *@param documentSpecification is the specification.
   *@return true if it should be included.
@@ -1436,68 +1631,251 @@ public class HDFSRepositoryConnector ext
     }
   }
 
-  /** Document identifier stream.
-  */
-  protected static class IdentifierStream implements IDocumentIdentifierStream
+  /**
+   * @param e
+   * @throws ManifoldCFException
+   * @throws ServiceInterruption
+   */
+  private static void handleIOException(IOException e) throws ManifoldCFException, ServiceInterruption {
+    if (!(e instanceof java.net.SocketTimeoutException) && (e instanceof InterruptedIOException)) {
+      throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+    }
+    long currentTime = System.currentTimeMillis();
+    throw new ServiceInterruption("IO exception: "+e.getMessage(), e, currentTime + 300000L, currentTime + 3 * 60 * 60000L,-1,false);
+  }
+  
+  /**
+   * @param e
+   * @throws ManifoldCFException
+   * @throws ServiceInterruption
+   */
+  private static void handleGeneralSecurityException(GeneralSecurityException e) throws ManifoldCFException, ServiceInterruption {
+    // Permanent problem: can't initialize transport layer
+    throw new ManifoldCFException("HDFS exception: "+e.getMessage(), e);
+  }
+
+  protected class CheckConnectionThread extends Thread {
+    protected Throwable exception = null;
+
+    public CheckConnectionThread() {
+      super();
+      setDaemon(true);
+    }
+
+    public void run() {
+      try {
+        session.getRepositoryInfo();
+      } catch (Throwable e) {
+        this.exception = e;
+      }
+    }
+
+    public Throwable getException() {
+      return exception;
+    }
+  }
+
+  protected class GetSessionThread extends Thread {
+    protected Throwable exception = null;
+
+    public GetSessionThread() {
+      super();
+      setDaemon(true);
+    }
+
+    public void run() {
+      try {
+        // Create a session
+        session = new HDFSSession(nameNode, config, user);
+      } catch (Throwable e) {
+        this.exception = e;
+      }
+    }
+
+    public Throwable getException() {
+      return exception;
+    }
+  }
+
+  protected class GetSeedsThread extends Thread {
+    protected Throwable exception = null;
+    protected final String path;
+    protected final XThreadStringBuffer seedBuffer;
+
+    public GetSeedsThread(String path) {
+      super();
+      this.path = path;
+      this.seedBuffer = new XThreadStringBuffer();
+      setDaemon(true);
+    }
+
+    @Override
+    public void run() {
+      try {
+        session.getSeeds(seedBuffer, path);
+        seedBuffer.signalDone();
+      } catch (Throwable e) {
+        this.exception = e;
+      }
+    }
+
+    public XThreadStringBuffer getBuffer() {
+      return seedBuffer;
+    }
+
+    public void finishUp() throws InterruptedException {
+      seedBuffer.abandon();
+      join();
+      Throwable thr = exception;
+      if (thr != null) {
+        if (thr instanceof RuntimeException) {
+          throw (RuntimeException) thr;
+        } else if (thr instanceof Error) {
+          throw (Error) thr;
+        } else {
+          throw new RuntimeException("Unhandled exception of type: "+thr.getClass().getName(),thr);
+        }
+      }
+    }
+  }
+
+  protected class GetObjectThread extends Thread {
+    protected final String nodeId;
+    protected Throwable exception = null;
+    protected Path response = null;
+
+    public GetObjectThread(String nodeId) {
+      super();
+      setDaemon(true);
+      this.nodeId = nodeId;
+    }
+
+    public void run() {
+      try {
+        response = session.getObject(nodeId);
+      } catch (Throwable e) {
+        this.exception = e;
+      }
+    }
+
+    public void finishUp() throws InterruptedException {
+      join();
+      Throwable thr = exception;
+      if (thr != null) {
+        if (thr instanceof RuntimeException) {
+          throw (RuntimeException) thr;
+        } else if (thr instanceof Error) {
+          throw (Error) thr;
+        } else {
+          throw new RuntimeException("Unhandled exception of type: "+thr.getClass().getName(),thr);
+        }
+      }
+    }
+
+    public Path getResponse() {
+      return response;
+    }
+
+    public Throwable getException() {
+      return exception;
+    }
+  }
+
+  protected class BackgroundStreamThread extends Thread
   {
-    protected String[] ids = null;
-    protected int currentIndex = 0;
+    protected final String nodeId;
+    
+    protected boolean abortThread = false;
+    protected Throwable responseException = null;
+    protected InputStream sourceStream = null;
+    protected XThreadInputStream threadStream = null;
+    
+    public BackgroundStreamThread(String nodeId)
+    {
+      super();
+      setDaemon(true);
+      this.nodeId = nodeId;
+    }
 
-    public IdentifierStream(DocumentSpecification spec)
-      throws ManifoldCFException
+    public void run()
     {
-      // Walk the specification for the "startpoint" types.  Amalgamate these into a list of strings.
-      // Presume that all roots are startpoint nodes
-      int i = 0;
-      int j = 0;
-      while (i < spec.getChildCount())
-      {
-        SpecificationNode n = spec.getChild(i);
-        if (n.getType().equals("startpoint"))
-        {
-          j++;
+      try {
+        try {
+          synchronized (this) {
+            if (!abortThread) {
+              sourceStream = session.getFSDataInputStream(nodeId);
+              threadStream = new XThreadInputStream(sourceStream);
+              this.notifyAll();
+            }
+          }
+          
+          if (threadStream != null)
+          {
+            // Stuff the content until we are done
+            threadStream.stuffQueue();
+          }
+        } finally {
+          if (sourceStream != null) {
+            sourceStream.close();
+          }
         }
-        i++;
+      } catch (Throwable e) {
+        responseException = e;
       }
-      ids = new String[j];
-      i = 0;
-      j = 0;
-      while (i < ids.length)
+    }
+
+    public InputStream getSafeInputStream() throws InterruptedException, IOException
+    {
+      // Must wait until stream is created, or until we note an exception was thrown.
+      while (true)
       {
-        SpecificationNode n = spec.getChild(i);
-        if (n.getType().equals("startpoint"))
+        synchronized (this)
         {
-          // The id returned MUST be in canonical form!!!
-          ids[j] = new Path(n.getAttributeValue("path")).toString();
-          if (Logging.connectors.isDebugEnabled())
-          {
-            Logging.connectors.debug("Seed = '"+ids[j]+"'");
+          if (responseException != null) {
+            throw new IllegalStateException("Check for response before getting stream");
           }
-          j++;
+          checkException(responseException);
+          if (threadStream != null) {
+            return threadStream;
+          }
+          wait();
         }
-        i++;
       }
     }
+    
+    public void finishUp() throws InterruptedException, IOException
+    {
+      // This will be called during the finally
+      // block in the case where all is well (and
+      // the stream completed) and in the case where
+      // there were exceptions.
+      synchronized (this) {
+        if (threadStream != null) {
+          threadStream.abort();
+        }
+        abortThread = true;
+      }
+
+      join();
 
-    /** Get the next identifier.
-    *@return the next document identifier, or null if there are no more.
-    */
-    public String getNextIdentifier()
-      throws ManifoldCFException, ServiceInterruption
-    {
-      if (currentIndex == ids.length)
-        return null;
-      return ids[currentIndex++];
-    }
-
-    /** Close the stream.
-    */
-    public void close()
-      throws ManifoldCFException
+      checkException(responseException);
+    }
+    
+    protected synchronized void checkException(Throwable exception) throws IOException
     {
-      ids = null;
+      if (exception != null)
+      {
+        Throwable e = exception;
+        if (e instanceof IOException) {
+          throw (IOException)e;
+        } else if (e instanceof RuntimeException) {
+          throw (RuntimeException)e;
+        } else if (e instanceof Error) {
+          throw (Error)e;
+        } else {
+          throw new RuntimeException("Unhandled exception of type: "+e.getClass().getName(),e);
+        }
+      }
     }
-
   }
-
 }

Added: manifoldcf/branches/CONNECTORS-728/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSSession.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-728/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSSession.java?rev=1496377&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-728/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSSession.java (added)
+++ manifoldcf/branches/CONNECTORS-728/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSSession.java Tue Jun 25 08:23:03 2013
@@ -0,0 +1,108 @@
+/* $Id: DropboxSession.java 1490621 2013-06-07 12:55:04Z kwright $ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.apache.manifoldcf.crawler.connectors.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.manifoldcf.core.common.*;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+import java.util.HashMap;
+
+/**
+ *
+ * @author andrew
+ */
+public class HDFSSession {
+
+  private FileSystem fileSystem;
+  private String nameNode;
+  private Configuration config;
+  private String user;
+  
+  public HDFSSession(String nameNode, Configuration config, String user) throws URISyntaxException, IOException, InterruptedException {
+    this.nameNode = nameNode;
+    this.config = config;
+    this.user = user;
+    fileSystem = FileSystem.get(new URI(nameNode), config, user);
+  }
+
+  public Map<String, String> getRepositoryInfo() {
+    Map<String, String> info = new HashMap<String, String>();
+
+    info.put("Name Node", nameNode);
+    info.put("Config", config.toString());
+    info.put("User", user);
+    info.put("Canonical Service Name", fileSystem.getCanonicalServiceName());
+    info.put("Default Block Size", Long.toString(fileSystem.getDefaultBlockSize()));
+    info.put("Default Replication", Short.toString(fileSystem.getDefaultReplication()));
+    info.put("Home Directory", fileSystem.getHomeDirectory().toUri().toString());
+    info.put("Working Directory", fileSystem.getWorkingDirectory().toUri().toString());
+    return info;
+  }
+
+  public void getSeeds(XThreadStringBuffer idBuffer, String path)
+    throws IOException, InterruptedException {
+
+    /*
+     * need to add root dir so that single files such as /file1 will still get read
+     */
+    idBuffer.add(path);
+    
+    /*
+     * gets a list of the contents of the entire folder: subfolders + files
+     */
+    FileStatus[] fileStatuses = fileSystem.listStatus(new Path(path));
+    for (FileStatus fileStatus : fileStatuses) {
+      /*
+       * only add the directories as seeds, we'll add the files later
+       */
+      if (fileStatus.isDir()) {
+        idBuffer.add(fileStatus.getPath().toUri().toString());
+      }
+    }
+  }
+  
+  public FileSystem getFileSystem() {
+	  return fileSystem;
+  }
+  
+  public Path getObject(String id) {
+    return new Path(id);
+  }
+
+  public FSDataInputStream getFSDataInputStream(String id) throws IOException {
+    return fileSystem.open(new Path(id));
+  }
+  
+  public void close() throws IOException {
+    fileSystem.close();
+  }
+}

Propchange: manifoldcf/branches/CONNECTORS-728/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSSession.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain