You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/07/02 19:08:36 UTC

svn commit: r1499023 - in /manifoldcf/trunk: ./ connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/ connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/

Author: kwright
Date: Tue Jul  2 17:08:35 2013
New Revision: 1499023

URL: http://svn.apache.org/r1499023
Log:
Fix for CONNECTORS-744.

Added:
    manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSSession.java   (with props)
Removed:
    manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputConstant.java
Modified:
    manifoldcf/trunk/CHANGES.txt
    manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputConfig.java
    manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputConnector.java
    manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputSpecs.java
    manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/ParameterEnum.java
    manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSRepositoryConnector.java

Modified: manifoldcf/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/CHANGES.txt?rev=1499023&r1=1499022&r2=1499023&view=diff
==============================================================================
--- manifoldcf/trunk/CHANGES.txt (original)
+++ manifoldcf/trunk/CHANGES.txt Tue Jul  2 17:08:35 2013
@@ -3,6 +3,9 @@ $Id$
 
 ======================= 1.3-dev =====================
 
+CONNECTORS-744: Use background threads in HDFS output connector.
+(Karl Wright)
+
 CONNECTORS-742: Document HDFS connector.
 (Karl Wright)
 

Modified: manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputConfig.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputConfig.java?rev=1499023&r1=1499022&r2=1499023&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputConfig.java (original)
+++ manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputConfig.java Tue Jul  2 17:08:35 2013
@@ -32,9 +32,9 @@ public class HDFSOutputConfig extends HD
 
   /** Parameters used for the configuration */
   final private static ParameterEnum[] CONFIGURATIONLIST = {
-    ParameterEnum.NAMENODEHOST,
-    ParameterEnum.NAMENODEPORT,
-    ParameterEnum.USER
+    ParameterEnum.namenodehost,
+    ParameterEnum.namenodeport,
+    ParameterEnum.user
   };
 
   /** Build a set of ElasticSearchParameters by reading ConfigParams. If the

Modified: manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputConnector.java?rev=1499023&r1=1499022&r2=1499023&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputConnector.java (original)
+++ manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputConnector.java Tue Jul  2 17:08:35 2013
@@ -29,9 +29,6 @@ import java.io.UnsupportedEncodingExcept
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URLEncoder;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -88,8 +85,12 @@ public class HDFSOutputConnector extends
   /** Forward to the template to view the specification parameters for the job */
   private static final String VIEW_SPECIFICATION_HTML = "viewSpecification.html";
 
-  protected Configuration config = null;
-  protected FileSystem fileSystem = null;
+  protected String nameNodeHost = null;
+  protected String nameNodePort = null;
+  protected String user = null;
+  protected HDFSSession session = null;
+  protected long lastSessionFetch = -1L;
+  protected static final long timeToRelease = 300000L;
 
   /** Constructor.
    */
@@ -112,64 +113,113 @@ public class HDFSOutputConnector extends
   @Override
   public void connect(ConfigParams configParams) {
     super.connect(configParams);
-    
+    nameNodeHost = configParams.getParameter(ParameterEnum.namenodehost.name());
+    nameNodePort = configParams.getParameter(ParameterEnum.namenodeport.name());
+    user = configParams.getParameter(ParameterEnum.user.name());
   }
 
   /** Close the connection.  Call this before discarding the connection.
    */
   @Override
   public void disconnect() throws ManifoldCFException {
-    try {
-      fileSystem.close();
-    } catch(IOException ex) {
-      throw new ManifoldCFException(ex);
-    }
-    config.clear();
+    closeSession();
+    nameNodeHost = null;
+    nameNodePort = null;
+    user = null;
     super.disconnect();
   }
 
-  /** Set up a session */
-  protected void getSession() throws ManifoldCFException, ServiceInterruption {
-    String nameNodeHost = params.getParameter(ParameterEnum.NAMENODEHOST.name());
-    if (nameNodeHost == null)
-      throw new ManifoldCFException("Namenodehost must be specified");
-
-    String nameNodePort = params.getParameter(ParameterEnum.NAMENODEPORT.name());
-    if (nameNodePort == null)
-      throw new ManifoldCFException("Namenodeport must be specified");
-    
-    String user = params.getParameter(ParameterEnum.USER.name());
-    if (user == null)
-      throw new ManifoldCFException("User must be specified");
-    
-    String nameNode = "hdfs://"+nameNodeHost+":"+nameNodePort;
+  /**
+   * @throws ManifoldCFException
+   */
+  @Override
+  public void poll() throws ManifoldCFException {
+    if (lastSessionFetch == -1L) {
+      return;
+    }
 
-    /*
-     * make Configuration
-     */
-    ClassLoader ocl = Thread.currentThread().getContextClassLoader();
-    try {
-      Thread.currentThread().setContextClassLoader(org.apache.hadoop.conf.Configuration.class.getClassLoader());
-      config = new Configuration();
-      config.set("fs.default.name", nameNode);
-    } finally {
-      Thread.currentThread().setContextClassLoader(ocl);
+    long currentTime = System.currentTimeMillis();
+    if (currentTime >= lastSessionFetch + timeToRelease) {
+      closeSession();
     }
-    
-    /*
-     * get connection to HDFS
-     */
-    try {
-      fileSystem = FileSystem.get(new URI(nameNode), config, user);
-    } catch (URISyntaxException e) {
-      handleURISyntaxException(e);
-      throw new ManifoldCFException(e.getMessage(),e);
-    } catch (IOException e) {
-      handleIOException(e);
-    } catch (InterruptedException e) {
-      throw new ManifoldCFException(e.getMessage(),ManifoldCFException.INTERRUPTED);
+  }
+
+  protected void closeSession()
+    throws ManifoldCFException {
+    if (session != null) {
+      try {
+        // This can in theory throw an IOException, so it is possible it is doing socket
+        // communication.  In practice, it's unlikely that there's any real IO, so I'm
+        // NOT putting it in a background thread for now.
+        session.close();
+      } catch (InterruptedIOException e) {
+        throw new ManifoldCFException(e.getMessage(),e,ManifoldCFException.INTERRUPTED);
+      } catch (IOException e) {
+        Logging.agents.warn("HDFS: Error closing connection: "+e.getMessage(),e);
+        // Eat the exception
+      } finally {
+        session = null;
+        lastSessionFetch = -1L;
+      }
     }
+  }
+
+  /** Set up a session */
+  protected HDFSSession getSession() throws ManifoldCFException, ServiceInterruption {
+    if (session == null) {
+      String nameNodeHost = params.getParameter(ParameterEnum.namenodehost.name());
+      if (nameNodeHost == null)
+        throw new ManifoldCFException("Namenodehost must be specified");
+
+      String nameNodePort = params.getParameter(ParameterEnum.namenodeport.name());
+      if (nameNodePort == null)
+        throw new ManifoldCFException("Namenodeport must be specified");
+      
+      String user = params.getParameter(ParameterEnum.user.name());
+      if (user == null)
+        throw new ManifoldCFException("User must be specified");
+      
+      String nameNode = "hdfs://"+nameNodeHost+":"+nameNodePort;
+      //System.out.println("Namenode = '"+nameNode+"'");
 
+      /*
+       * make Configuration
+       */
+      Configuration config = null;
+      ClassLoader ocl = Thread.currentThread().getContextClassLoader();
+      try {
+        Thread.currentThread().setContextClassLoader(org.apache.hadoop.conf.Configuration.class.getClassLoader());
+        config = new Configuration();
+        config.set("fs.default.name", nameNode);
+      } finally {
+        Thread.currentThread().setContextClassLoader(ocl);
+      }
+      
+      /*
+       * get connection to HDFS
+       */
+      GetSessionThread t = new GetSessionThread(nameNode,config,user);
+      try {
+        t.start();
+        t.finishUp();
+      } catch (InterruptedException e) {
+        t.interrupt();
+        throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+      } catch (java.net.SocketTimeoutException e) {
+        handleIOException(e);
+      } catch (InterruptedIOException e) {
+        t.interrupt();
+        handleIOException(e);
+      } catch (URISyntaxException e) {
+        handleURISyntaxException(e);
+      } catch (IOException e) {
+        handleIOException(e);
+      }
+      
+      session = t.getResult();
+    }
+    lastSessionFetch = System.currentTimeMillis();
+    return session;
   }
 
   /** Test the connection.  Returns a string describing the connection integrity.
@@ -178,10 +228,12 @@ public class HDFSOutputConnector extends
   @Override
   public String check() throws ManifoldCFException {
     try {
-      getSession();
+      checkConnection();
       return super.check();
     } catch (ServiceInterruption e) {
-      return "Transient error: "+e.getMessage();
+      return "Connection temporarily failed: " + e.getMessage();
+    } catch (ManifoldCFException e) {
+      return "Connection failed: " + e.getMessage();
     }
   }
 
@@ -218,17 +270,9 @@ public class HDFSOutputConnector extends
    */
   @Override
   public int addOrReplaceDocument(String documentURI, String outputDescription, RepositoryDocument document, String authorityNameString, IOutputAddActivity activities) throws ManifoldCFException, ServiceInterruption {
-    // Establish a session
-    getSession();
 
-    HDFSOutputConfig config = getConfigParameters(null);
-
-    HDFSOutputSpecs specs = null;
-    InputStream input = null;
-    FSDataOutputStream output = null;
-    FileLock lock = null;
     try {
-      specs = new HDFSOutputSpecs(outputDescription);
+      HDFSOutputSpecs specs = new HDFSOutputSpecs(outputDescription);
 
       /*
        * make file path
@@ -241,54 +285,18 @@ public class HDFSOutputConnector extends
       strBuff.append(documentURItoFilePath(documentURI));
       Path path = new Path(strBuff.toString());
 
-      /*
-       * make directory
-       */
-      if (!fileSystem.exists(path.getParent())) {
-        fileSystem.mkdirs(path.getParent());
-      }
-
-      /*
-       * delete old file
-       */
-      if (fileSystem.exists(path)) {
-        fileSystem.delete(path, true);
-      }
-
-      input = document.getBinaryStream();
-      output = fileSystem.create(path);
-
-      /*
-       * write file
-       */
-      byte buf[] = new byte[65536];
-      int len;
-      while((len = input.read(buf)) != -1) {
-        output.write(buf, 0, len);
-      }
-      output.flush();
+      Long startTime = new Long(System.currentTimeMillis());
+      createFile(path, document.getBinaryStream());
+      activities.recordActivity(startTime, INGEST_ACTIVITY, new Long(document.getBinaryLength()), documentURI, "OK", null);
+      return DOCUMENTSTATUS_ACCEPTED;
     } catch (JSONException e) {
       handleJSONException(e);
       return DOCUMENTSTATUS_REJECTED;
     } catch (URISyntaxException e) {
       handleURISyntaxException(e);
       return DOCUMENTSTATUS_REJECTED;
-    } catch (IOException e) {
-      handleIOException(e);
-      return DOCUMENTSTATUS_REJECTED;
-    } finally {
-      try {
-        input.close();
-      } catch (IOException e) {
-      }
-      try {
-        output.close();
-      } catch (IOException e) {
-      }
     }
 
-    activities.recordActivity(null, INGEST_ACTIVITY, new Long(document.getBinaryLength()), documentURI, "OK", null);
-    return DOCUMENTSTATUS_ACCEPTED;
   }
 
   /** Remove a document using the connector.
@@ -300,14 +308,9 @@ public class HDFSOutputConnector extends
    */
   @Override
   public void removeDocument(String documentURI, String outputDescription, IOutputRemoveActivity activities) throws ManifoldCFException, ServiceInterruption {
-    // Establish a session
-    getSession();
 
-    HDFSOutputConfig config = getConfigParameters(null);
-
-    HDFSOutputSpecs specs = null;
     try {
-      specs = new HDFSOutputSpecs(outputDescription);
+      HDFSOutputSpecs specs = new HDFSOutputSpecs(outputDescription);
 
       /*
        * make path
@@ -319,22 +322,14 @@ public class HDFSOutputConnector extends
       strBuff.append("/");
       strBuff.append(documentURItoFilePath(documentURI));
       Path path = new Path(strBuff.toString());
-
-      /*
-       * delete old file
-       */
-      if (fileSystem.exists(path)) {
-        fileSystem.delete(path, true);
-      }
+      Long startTime = new Long(System.currentTimeMillis());
+      deleteFile(path);
+      activities.recordActivity(startTime, REMOVE_ACTIVITY, null, documentURI, "OK", null);
     } catch (JSONException e) {
       handleJSONException(e);
     } catch (URISyntaxException e) {
       handleURISyntaxException(e);
-    } catch (IOException e) {
-      handleIOException(e);
     }
-
-    activities.recordActivity(null, REMOVE_ACTIVITY, null, documentURI, "OK", null);
   }
 
   /** Output the configuration header section.
@@ -437,7 +432,7 @@ public class HDFSOutputConnector extends
     ConfigurationNode specNode = getSpecNode(os);
     boolean bAdd = (specNode == null);
     if (bAdd) {
-      specNode = new SpecificationNode(HDFSOutputConstant.PARAM_ROOTPATH);
+      specNode = new SpecificationNode(ParameterEnum.rootpath.name());
     }
     HDFSOutputSpecs.contextToSpecNode(variableContext, specNode);
     if (bAdd) {
@@ -467,7 +462,7 @@ public class HDFSOutputConnector extends
     int l = os.getChildCount();
     for (int i = 0; i < l; i++) {
       SpecificationNode node = os.getChild(i);
-      if (node.getType().equals(HDFSOutputConstant.PARAM_ROOTPATH)) {
+      if (node.getType().equals(ParameterEnum.rootpath.name())) {
         return node;
       }
     }
@@ -591,7 +586,222 @@ public class HDFSOutputConnector extends
       throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
     }
     long currentTime = System.currentTimeMillis();
+    Logging.agents.warn("HDFS output connection: IO exception: "+e.getMessage(),e);
     throw new ServiceInterruption("IO exception: "+e.getMessage(), e, currentTime + 300000L, currentTime + 3 * 60 * 60000L,-1,false);
   }
-  
+
+  protected static class CreateFileThread extends Thread {
+    protected final HDFSSession session;
+    protected final Path path;
+    protected final InputStream input;
+    protected Throwable exception = null;
+
+    public CreateFileThread(HDFSSession session, Path path, InputStream input) {
+      super();
+      this.session = session;
+      this.path = path;
+      this.input = input;
+      setDaemon(true);
+    }
+
+    public void run() {
+      try {
+        session.createFile(path,input);
+      } catch (Throwable e) {
+        this.exception = e;
+      }
+    }
+
+    public void finishUp() throws InterruptedException, IOException {
+      join();
+      Throwable thr = exception;
+      if (thr != null) {
+        if (thr instanceof IOException) {
+          throw (IOException) thr;
+        } else if (thr instanceof RuntimeException) {
+          throw (RuntimeException) thr;
+        } else {
+          throw (Error) thr;
+        }
+      }
+    }
+  }
+
+  protected void createFile(Path path, InputStream input)
+    throws ManifoldCFException, ServiceInterruption {
+    CreateFileThread t = new CreateFileThread(getSession(), path, input);
+    try {
+      t.start();
+      t.finishUp();
+    } catch (InterruptedException e) {
+      t.interrupt();
+      throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED);
+    } catch (java.net.SocketTimeoutException e) {
+      handleIOException(e);
+    } catch (InterruptedIOException e) {
+      t.interrupt();
+      handleIOException(e);
+    } catch (IOException e) {
+      handleIOException(e);
+    }
+  }
+
+  protected static class DeleteFileThread extends Thread {
+    protected final HDFSSession session;
+    protected final Path path;
+    protected Throwable exception = null;
+
+    public DeleteFileThread(HDFSSession session, Path path) {
+      super();
+      this.session = session;
+      this.path = path;
+      setDaemon(true);
+    }
+
+    public void run() {
+      try {
+        session.deleteFile(path);
+      } catch (Throwable e) {
+        this.exception = e;
+      }
+    }
+
+    public void finishUp() throws InterruptedException, IOException {
+      join();
+      Throwable thr = exception;
+      if (thr != null) {
+        if (thr instanceof IOException) {
+          throw (IOException) thr;
+        } else if (thr instanceof RuntimeException) {
+          throw (RuntimeException) thr;
+        } else {
+          throw (Error) thr;
+        }
+      }
+    }
+  }
+
+  protected void deleteFile(Path path)
+    throws ManifoldCFException, ServiceInterruption {
+    // Establish a session
+    DeleteFileThread t = new DeleteFileThread(getSession(),path);
+    try {
+      t.start();
+      t.finishUp();
+    } catch (InterruptedException e) {
+      t.interrupt();
+      throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED);
+    } catch (java.net.SocketTimeoutException e) {
+      handleIOException(e);
+    } catch (InterruptedIOException e) {
+      t.interrupt();
+      handleIOException(e);
+    } catch (IOException e) {
+      handleIOException(e);
+    }
+  }
+
+
+  protected static class CheckConnectionThread extends Thread {
+    protected final HDFSSession session;
+    protected Throwable exception = null;
+
+    public CheckConnectionThread(HDFSSession session) {
+      super();
+      this.session = session;
+      setDaemon(true);
+    }
+
+    public void run() {
+      try {
+        session.getRepositoryInfo();
+      } catch (Throwable e) {
+        this.exception = e;
+      }
+    }
+
+    public void finishUp() throws InterruptedException, IOException {
+      join();
+      Throwable thr = exception;
+      if (thr != null) {
+        if (thr instanceof IOException) {
+          throw (IOException) thr;
+        } else if (thr instanceof RuntimeException) {
+          throw (RuntimeException) thr;
+        } else {
+          throw (Error) thr;
+        }
+      }
+    }
+  }
+
+  /**
+   * @throws ManifoldCFException
+   * @throws ServiceInterruption
+   */
+  protected void checkConnection() throws ManifoldCFException, ServiceInterruption {
+    CheckConnectionThread t = new CheckConnectionThread(getSession());
+    try {
+      t.start();
+      t.finishUp();
+      return;
+    } catch (InterruptedException e) {
+      t.interrupt();
+      throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+    } catch (java.net.SocketTimeoutException e) {
+      handleIOException(e);
+    } catch (InterruptedIOException e) {
+      t.interrupt();
+      handleIOException(e);
+    } catch (IOException e) {
+      handleIOException(e);
+    }
+  }
+
+  protected static class GetSessionThread extends Thread {
+    protected final String nameNode;
+    protected final Configuration config;
+    protected final String user;
+    protected Throwable exception = null;
+    protected HDFSSession session = null;
+
+    public GetSessionThread(String nameNode, Configuration config, String user) {
+      super();
+      this.nameNode = nameNode;
+      this.config = config;
+      this.user = user;
+      setDaemon(true);
+    }
+
+    public void run() {
+      try {
+        // Create a session
+        session = new HDFSSession(nameNode, config, user);
+      } catch (Throwable e) {
+        this.exception = e;
+      }
+    }
+
+    public void finishUp()
+      throws InterruptedException, IOException, URISyntaxException {
+      join();
+      Throwable thr = exception;
+      if (thr != null) {
+        if (thr instanceof IOException) {
+          throw (IOException) thr;
+        } else if (thr instanceof URISyntaxException) {
+          throw (URISyntaxException) thr;
+        } else if (thr instanceof RuntimeException) {
+          throw (RuntimeException) thr;
+        } else {
+          throw (Error) thr;
+        }
+      }
+    }
+    
+    public HDFSSession getResult() {
+      return session;
+    }
+  }
+
 }

Modified: manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputSpecs.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputSpecs.java?rev=1499023&r1=1499022&r2=1499023&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputSpecs.java (original)
+++ manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputSpecs.java Tue Jul  2 17:08:35 2013
@@ -39,7 +39,7 @@ public class HDFSOutputSpecs extends HDF
   private static final long serialVersionUID = 1145652730572662025L;
 
   final public static ParameterEnum[] SPECIFICATIONLIST = {
-    ParameterEnum.ROOTPATH
+    ParameterEnum.rootpath
   };
 
   private String rootPath;
@@ -118,7 +118,7 @@ public class HDFSOutputSpecs extends HDF
    * @return
    */
   public String getRootPath() {
-    return get(ParameterEnum.ROOTPATH);
+    return get(ParameterEnum.rootpath);
   }
 
   /**
@@ -142,7 +142,7 @@ public class HDFSOutputSpecs extends HDF
       }
       return set;
     } catch (IOException e) {
-      throw new ManifoldCFException(e);
+      throw new ManifoldCFException(e.getMessage(),e);
     } finally {
       if (br != null) {
         IOUtils.closeQuietly(br);

Added: manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSSession.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSSession.java?rev=1499023&view=auto
==============================================================================
--- manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSSession.java (added)
+++ manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSSession.java Tue Jul  2 17:08:35 2013
@@ -0,0 +1,119 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.apache.manifoldcf.agents.output.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.manifoldcf.core.common.*;
+
+import java.io.InputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+import java.util.HashMap;
+
+/**
+ */
+public class HDFSSession {
+
+  private FileSystem fileSystem;
+  private final String nameNode;
+  private final Configuration config;
+  private final String user;
+  
+  public HDFSSession(String nameNode, Configuration config, String user) throws URISyntaxException, IOException, InterruptedException {
+    this.nameNode = nameNode;
+    this.config = config;
+    this.user = user;
+    fileSystem = FileSystem.get(new URI(nameNode), config, user);
+  }
+
+  public Map<String, String> getRepositoryInfo() {
+    Map<String, String> info = new HashMap<String, String>();
+
+    info.put("Name Node", nameNode);
+    info.put("Config", config.toString());
+    info.put("User", user);
+    info.put("Canonical Service Name", fileSystem.getCanonicalServiceName());
+    info.put("Default Block Size", Long.toString(fileSystem.getDefaultBlockSize()));
+    info.put("Default Replication", Short.toString(fileSystem.getDefaultReplication()));
+    info.put("Home Directory", fileSystem.getHomeDirectory().toUri().toString());
+    info.put("Working Directory", fileSystem.getWorkingDirectory().toUri().toString());
+    return info;
+  }
+
+  public void deleteFile(Path path)
+    throws IOException {
+    if (fileSystem.exists(path)) {
+      fileSystem.delete(path, true);
+    }
+  }
+
+  public void createFile(Path path, InputStream input)
+    throws IOException {
+    /*
+      * make directory
+      */
+    if (!fileSystem.exists(path.getParent())) {
+      fileSystem.mkdirs(path.getParent());
+    }
+
+    /*
+      * delete old file
+      */
+    if (fileSystem.exists(path)) {
+      fileSystem.delete(path, true);
+    }
+
+    FSDataOutputStream output = fileSystem.create(path);
+    try {
+      /*
+       * write file
+       */
+      byte buf[] = new byte[65536];
+      int len;
+      while((len = input.read(buf)) != -1) {
+        output.write(buf, 0, len);
+      }
+      output.flush();
+    } finally {
+      output.close();
+    }
+
+    // Do NOT close input; it's closed by the caller.
+  }
+
+  public URI getUri() {
+    return fileSystem.getUri();
+  }
+
+  public void close() throws IOException {
+    fileSystem.close();
+  }
+}

Propchange: manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSSession.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSSession.java
------------------------------------------------------------------------------
    svn:keywords = Id

Modified: manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/ParameterEnum.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/ParameterEnum.java?rev=1499023&r1=1499022&r2=1499023&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/ParameterEnum.java (original)
+++ manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/ParameterEnum.java Tue Jul  2 17:08:35 2013
@@ -24,10 +24,10 @@ import java.util.Map;
 
 /** Parameters constants */
 public enum ParameterEnum {
-  NAMENODEHOST("localhost"),
-  NAMENODEPORT("9000"),
-  USER(""),
-  ROOTPATH("");
+  namenodehost("localhost"),
+  namenodeport("9000"),
+  user(""),
+  rootpath("");
 
   final protected String defaultValue;
 

Modified: manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSRepositoryConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSRepositoryConnector.java?rev=1499023&r1=1499022&r2=1499023&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSRepositoryConnector.java (original)
+++ manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSRepositoryConnector.java Tue Jul  2 17:08:35 2013
@@ -57,7 +57,6 @@ public class HDFSRepositoryConnector ext
   protected String nameNodeHost = null;
   protected String nameNodePort = null;
   protected String user = null;
-  protected Configuration config = null;
   protected HDFSSession session = null;
   protected long lastSessionFetch = -1L;
   protected static final long timeToRelease = 300000L;
@@ -126,17 +125,6 @@ public class HDFSRepositoryConnector ext
     nameNodePort = configParams.getParameter("namenodeport");
     user = configParams.getParameter("user");
     
-    /*
-     * make Configuration
-     */
-    ClassLoader ocl = Thread.currentThread().getContextClassLoader();
-    try {
-      Thread.currentThread().setContextClassLoader(org.apache.hadoop.conf.Configuration.class.getClassLoader());
-      config = new Configuration();
-      config.set("fs.default.name", makeNameNodeURI(nameNodeHost, nameNodePort));
-    } finally {
-      Thread.currentThread().setContextClassLoader(ocl);
-    }
   }
 
   /* (non-Javadoc)
@@ -145,21 +133,16 @@ public class HDFSRepositoryConnector ext
   @Override
   public void disconnect() throws ManifoldCFException {
     closeSession();
-    config = null;
     user = null;
     nameNodeHost = null;
     nameNodePort = null;
     super.disconnect();
   }
 
-  protected static String makeNameNodeURI(String host, String port) {
-    return "hdfs://"+host+":"+port;
-  }
-  
   /**
    * Set up a session
    */
-  protected void getSession() throws ManifoldCFException, ServiceInterruption {
+  protected HDFSSession getSession() throws ManifoldCFException, ServiceInterruption {
     if (session == null) {
       if (StringUtils.isEmpty(nameNodeHost)) {
         throw new ManifoldCFException("Parameter namenodehost required but not set");
@@ -180,42 +163,43 @@ public class HDFSRepositoryConnector ext
       if (Logging.connectors.isDebugEnabled()) {
         Logging.connectors.debug("HDFS: User = '" + user + "'");
       }
+
+      String nameNode = "hdfs://"+nameNodeHost+":"+nameNodePort;
+
+      /*
+       * make Configuration
+       */
+      Configuration config = null;
+      ClassLoader ocl = Thread.currentThread().getContextClassLoader();
+      try {
+        Thread.currentThread().setContextClassLoader(org.apache.hadoop.conf.Configuration.class.getClassLoader());
+        config = new Configuration();
+        config.set("fs.default.name", nameNode);
+      } finally {
+        Thread.currentThread().setContextClassLoader(ocl);
+      }
       
-      long currentTime;
-      GetSessionThread t = new GetSessionThread();
+      GetSessionThread t = new GetSessionThread(nameNode,config,user);
       try {
         t.start();
-        t.join();
-        Throwable thr = t.getException();
-        if (thr != null) {
-          if (thr instanceof IOException) {
-            throw (IOException) thr;
-          } else if (thr instanceof URISyntaxException) {
-            throw (URISyntaxException) thr;
-          } else if (thr instanceof RuntimeException) {
-            throw (RuntimeException) thr;
-          } else {
-            throw (Error) thr;
-          }
-        }
+        t.finishUp();
       } catch (InterruptedException e) {
         t.interrupt();
         throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
       } catch (java.net.SocketTimeoutException e) {
-        Logging.connectors.warn("HDFS: Socket timeout: " + e.getMessage(), e);
         handleIOException(e);
       } catch (InterruptedIOException e) {
         t.interrupt();
-        throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+        handleIOException(e);
       } catch (URISyntaxException e) {
-        Logging.connectors.error("HDFS: URI syntax exception: " + e.getMessage(), e);
         handleURISyntaxException(e);
       } catch (IOException e) {
-        Logging.connectors.warn("HDFS: IO error: " + e.getMessage(), e);
         handleIOException(e);
       }
+      session = t.getResult();
     }
     lastSessionFetch = System.currentTimeMillis();
+    return session;
   }
 
   /**
@@ -238,32 +222,6 @@ public class HDFSRepositoryConnector ext
 
   /**
    * @throws ManifoldCFException
-   * @throws ServiceInterruption
-   */
-  protected void checkConnection() throws ManifoldCFException, ServiceInterruption {
-    getSession();
-    CheckConnectionThread t = new CheckConnectionThread();
-    try {
-      t.start();
-      t.finishUp();
-      return;
-    } catch (InterruptedException e) {
-      t.interrupt();
-      throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
-    } catch (java.net.SocketTimeoutException e) {
-      Logging.connectors.warn("HDFS: Socket timeout: " + e.getMessage(), e);
-      handleIOException(e);
-    } catch (InterruptedIOException e) {
-      t.interrupt();
-      throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
-    } catch (IOException e) {
-      Logging.connectors.warn("HDFS: Error checking repository: " + e.getMessage(), e);
-      handleIOException(e);
-    }
-  }
-
-  /**
-   * @throws ManifoldCFException
    */
   @Override
   public void poll() throws ManifoldCFException {
@@ -500,9 +458,6 @@ public class HDFSRepositoryConnector ext
         }
         data.addField("uri",uri);
 
-        // Make sure we have a session
-        getSession();
-
         // We will record document fetch as an activity
         long startTime = System.currentTimeMillis();
         String errorCode = "FAILED";
@@ -510,7 +465,7 @@ public class HDFSRepositoryConnector ext
         long fileSize = 0;
 
         try {
-          BackgroundStreamThread t = new BackgroundStreamThread(new Path(documentIdentifier));
+          BackgroundStreamThread t = new BackgroundStreamThread(getSession(),new Path(documentIdentifier));
           try {
             t.start();
             boolean wasInterrupted = false;
@@ -1661,6 +1616,7 @@ public class HDFSRepositoryConnector ext
     if (!(e instanceof java.net.SocketTimeoutException) && (e instanceof InterruptedIOException)) {
       throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
     }
+    Logging.connectors.warn("HDFS: IO exception: "+e.getMessage(),e);
     long currentTime = System.currentTimeMillis();
     throw new ServiceInterruption("IO exception: "+e.getMessage(), e, currentTime + 300000L, currentTime + 3 * 60 * 60000L,-1,false);
   }
@@ -1672,14 +1628,17 @@ public class HDFSRepositoryConnector ext
    */
   private static void handleURISyntaxException(URISyntaxException e) throws ManifoldCFException, ServiceInterruption {
     // Permanent problem
-    throw new ManifoldCFException("HDFS bad namenode specification: "+e.getMessage(), e);
+    Logging.connectors.error("HDFS: Bad namenode specification: "+e.getMessage(), e);
+    throw new ManifoldCFException("Bad namenode specification: "+e.getMessage(), e);
   }
 
-  protected class CheckConnectionThread extends Thread {
+  protected static class CheckConnectionThread extends Thread {
+    protected final HDFSSession session;
     protected Throwable exception = null;
 
-    public CheckConnectionThread() {
+    public CheckConnectionThread(HDFSSession session) {
       super();
+      this.session = session;
       setDaemon(true);
     }
 
@@ -1706,41 +1665,90 @@ public class HDFSRepositoryConnector ext
     }
   }
 
-  protected class GetSessionThread extends Thread {
+  /**
+   * @throws ManifoldCFException
+   * @throws ServiceInterruption
+   */
+  protected void checkConnection() throws ManifoldCFException, ServiceInterruption {
+    CheckConnectionThread t = new CheckConnectionThread(getSession());
+    try {
+      t.start();
+      t.finishUp();
+      return;
+    } catch (InterruptedException e) {
+      t.interrupt();
+      throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+    } catch (java.net.SocketTimeoutException e) {
+      handleIOException(e);
+    } catch (InterruptedIOException e) {
+      t.interrupt();
+      throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+    } catch (IOException e) {
+      handleIOException(e);
+    }
+  }
+
+  protected static class GetSessionThread extends Thread {
+    protected final String nameNode;
+    protected final Configuration config;
+    protected final String user;
     protected Throwable exception = null;
+    protected HDFSSession session;
 
-    public GetSessionThread() {
+    public GetSessionThread(String nameNode, Configuration config, String user) {
       super();
+      this.nameNode = nameNode;
+      this.config = config;
+      this.user = user;
       setDaemon(true);
     }
 
     public void run() {
       try {
         // Create a session
-        session = new HDFSSession(makeNameNodeURI(nameNodeHost,nameNodePort), config, user);
+        session = new HDFSSession(nameNode, config, user);
       } catch (Throwable e) {
         this.exception = e;
       }
     }
 
-    public Throwable getException() {
-      return exception;
+    public void finishUp()
+      throws InterruptedException, IOException, URISyntaxException {
+      join();
+      Throwable thr = exception;
+      if (thr != null) {
+        if (thr instanceof IOException) {
+          throw (IOException) thr;
+        } else if (thr instanceof URISyntaxException) {
+          throw (URISyntaxException) thr;
+        } else if (thr instanceof RuntimeException) {
+          throw (RuntimeException) thr;
+        } else {
+          throw (Error) thr;
+        }
+      }
+    }
+    
+    public HDFSSession getResult() {
+      return session;
     }
   }
 
   protected FileStatus[] getChildren(Path path)
     throws ManifoldCFException, ServiceInterruption {
-    getSession();
+    GetChildrenThread t = new GetChildrenThread(getSession(), path);
     try {
-      GetChildrenThread t = new GetChildrenThread(path);
-      try {
-        t.start();
-        t.finishUp();
-      } catch (InterruptedException e) {
-        t.interrupt();
-        throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
-      }
+      t.start();
+      t.finishUp();
       return t.getResult();
+    } catch (InterruptedException e) {
+      t.interrupt();
+      throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+    } catch (java.net.SocketTimeoutException e) {
+      handleIOException(e);
+    } catch (InterruptedIOException e) {
+      t.interrupt();
+      handleIOException(e);
     } catch (IOException e) {
       handleIOException(e);
     }
@@ -1750,10 +1758,12 @@ public class HDFSRepositoryConnector ext
   protected class GetChildrenThread extends Thread {
     protected Throwable exception = null;
     protected FileStatus[] result = null;
+    protected final HDFSSession session;
     protected final Path path;
 
-    public GetChildrenThread(Path path) {
+    public GetChildrenThread(HDFSSession session, Path path) {
       super();
+      this.session = session;
       this.path = path;
       setDaemon(true);
     }
@@ -1790,32 +1800,35 @@ public class HDFSRepositoryConnector ext
 
   protected FileStatus getObject(Path path)
     throws ManifoldCFException, ServiceInterruption {
-    getSession();
+    GetObjectThread objt = new GetObjectThread(getSession(),path);
     try {
-      GetObjectThread objt = new GetObjectThread(path);
-      try {
-        objt.start();
-        objt.finishUp();
-      } catch (InterruptedException e) {
-        objt.interrupt();
-        throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
-      }
-
+      objt.start();
+      objt.finishUp();
       return objt.getResponse();
+    } catch (InterruptedException e) {
+      objt.interrupt();
+      throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+    } catch (java.net.SocketTimeoutException e) {
+      handleIOException(e);
+    } catch (InterruptedIOException e) {
+      objt.interrupt();
+      handleIOException(e);
     } catch (IOException e) {
       handleIOException(e);
     }
     return null;
   }
   
-  protected class GetObjectThread extends Thread {
+  protected static class GetObjectThread extends Thread {
+    protected final HDFSSession session;
     protected final Path nodeId;
     protected Throwable exception = null;
     protected FileStatus response = null;
 
-    public GetObjectThread(Path nodeId) {
+    public GetObjectThread(HDFSSession session, Path nodeId) {
       super();
       setDaemon(true);
+      this.session = session;
       this.nodeId = nodeId;
     }
 
@@ -1849,8 +1862,9 @@ public class HDFSRepositoryConnector ext
 
   }
 
-  protected class BackgroundStreamThread extends Thread
+  protected static class BackgroundStreamThread extends Thread
   {
+    protected final HDFSSession session;
     protected final Path nodeId;
     
     protected boolean abortThread = false;
@@ -1858,10 +1872,11 @@ public class HDFSRepositoryConnector ext
     protected InputStream sourceStream = null;
     protected XThreadInputStream threadStream = null;
     
-    public BackgroundStreamThread(Path nodeId)
+    public BackgroundStreamThread(HDFSSession session, Path nodeId)
     {
       super();
       setDaemon(true);
+      this.session = session;
       this.nodeId = nodeId;
     }