You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/07/02 19:08:36 UTC
svn commit: r1499023 - in /manifoldcf/trunk: ./
connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/
connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/
Author: kwright
Date: Tue Jul 2 17:08:35 2013
New Revision: 1499023
URL: http://svn.apache.org/r1499023
Log:
Fix for CONNECTORS-744.
Added:
manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSSession.java (with props)
Removed:
manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputConstant.java
Modified:
manifoldcf/trunk/CHANGES.txt
manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputConfig.java
manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputConnector.java
manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputSpecs.java
manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/ParameterEnum.java
manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSRepositoryConnector.java
Modified: manifoldcf/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/CHANGES.txt?rev=1499023&r1=1499022&r2=1499023&view=diff
==============================================================================
--- manifoldcf/trunk/CHANGES.txt (original)
+++ manifoldcf/trunk/CHANGES.txt Tue Jul 2 17:08:35 2013
@@ -3,6 +3,9 @@ $Id$
======================= 1.3-dev =====================
+CONNECTORS-744: Use background threads in HDFS output connector.
+(Karl Wright)
+
CONNECTORS-742: Document HDFS connector.
(Karl Wright)
Modified: manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputConfig.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputConfig.java?rev=1499023&r1=1499022&r2=1499023&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputConfig.java (original)
+++ manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputConfig.java Tue Jul 2 17:08:35 2013
@@ -32,9 +32,9 @@ public class HDFSOutputConfig extends HD
/** Parameters used for the configuration */
final private static ParameterEnum[] CONFIGURATIONLIST = {
- ParameterEnum.NAMENODEHOST,
- ParameterEnum.NAMENODEPORT,
- ParameterEnum.USER
+ ParameterEnum.namenodehost,
+ ParameterEnum.namenodeport,
+ ParameterEnum.user
};
/** Build a set of ElasticSearchParameters by reading ConfigParams. If the
Modified: manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputConnector.java?rev=1499023&r1=1499022&r2=1499023&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputConnector.java (original)
+++ manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputConnector.java Tue Jul 2 17:08:35 2013
@@ -29,9 +29,6 @@ import java.io.UnsupportedEncodingExcept
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -88,8 +85,12 @@ public class HDFSOutputConnector extends
/** Forward to the template to view the specification parameters for the job */
private static final String VIEW_SPECIFICATION_HTML = "viewSpecification.html";
- protected Configuration config = null;
- protected FileSystem fileSystem = null;
+ protected String nameNodeHost = null;
+ protected String nameNodePort = null;
+ protected String user = null;
+ protected HDFSSession session = null;
+ protected long lastSessionFetch = -1L;
+ protected static final long timeToRelease = 300000L;
/** Constructor.
*/
@@ -112,64 +113,113 @@ public class HDFSOutputConnector extends
@Override
public void connect(ConfigParams configParams) {
super.connect(configParams);
-
+ nameNodeHost = configParams.getParameter(ParameterEnum.namenodehost.name());
+ nameNodePort = configParams.getParameter(ParameterEnum.namenodeport.name());
+ user = configParams.getParameter(ParameterEnum.user.name());
}
/** Close the connection. Call this before discarding the connection.
*/
@Override
public void disconnect() throws ManifoldCFException {
- try {
- fileSystem.close();
- } catch(IOException ex) {
- throw new ManifoldCFException(ex);
- }
- config.clear();
+ closeSession();
+ nameNodeHost = null;
+ nameNodePort = null;
+ user = null;
super.disconnect();
}
- /** Set up a session */
- protected void getSession() throws ManifoldCFException, ServiceInterruption {
- String nameNodeHost = params.getParameter(ParameterEnum.NAMENODEHOST.name());
- if (nameNodeHost == null)
- throw new ManifoldCFException("Namenodehost must be specified");
-
- String nameNodePort = params.getParameter(ParameterEnum.NAMENODEPORT.name());
- if (nameNodePort == null)
- throw new ManifoldCFException("Namenodeport must be specified");
-
- String user = params.getParameter(ParameterEnum.USER.name());
- if (user == null)
- throw new ManifoldCFException("User must be specified");
-
- String nameNode = "hdfs://"+nameNodeHost+":"+nameNodePort;
+ /**
+ * @throws ManifoldCFException
+ */
+ @Override
+ public void poll() throws ManifoldCFException {
+ if (lastSessionFetch == -1L) {
+ return;
+ }
- /*
- * make Configuration
- */
- ClassLoader ocl = Thread.currentThread().getContextClassLoader();
- try {
- Thread.currentThread().setContextClassLoader(org.apache.hadoop.conf.Configuration.class.getClassLoader());
- config = new Configuration();
- config.set("fs.default.name", nameNode);
- } finally {
- Thread.currentThread().setContextClassLoader(ocl);
+ long currentTime = System.currentTimeMillis();
+ if (currentTime >= lastSessionFetch + timeToRelease) {
+ closeSession();
}
-
- /*
- * get connection to HDFS
- */
- try {
- fileSystem = FileSystem.get(new URI(nameNode), config, user);
- } catch (URISyntaxException e) {
- handleURISyntaxException(e);
- throw new ManifoldCFException(e.getMessage(),e);
- } catch (IOException e) {
- handleIOException(e);
- } catch (InterruptedException e) {
- throw new ManifoldCFException(e.getMessage(),ManifoldCFException.INTERRUPTED);
+ }
+
+ protected void closeSession()
+ throws ManifoldCFException {
+ if (session != null) {
+ try {
+ // This can in theory throw an IOException, so it is possible it is doing socket
+ // communication. In practice, it's unlikely that there's any real IO, so I'm
+ // NOT putting it in a background thread for now.
+ session.close();
+ } catch (InterruptedIOException e) {
+ throw new ManifoldCFException(e.getMessage(),e,ManifoldCFException.INTERRUPTED);
+ } catch (IOException e) {
+ Logging.agents.warn("HDFS: Error closing connection: "+e.getMessage(),e);
+ // Eat the exception
+ } finally {
+ session = null;
+ lastSessionFetch = -1L;
+ }
}
+ }
+
+ /** Set up a session */
+ protected HDFSSession getSession() throws ManifoldCFException, ServiceInterruption {
+ if (session == null) {
+ String nameNodeHost = params.getParameter(ParameterEnum.namenodehost.name());
+ if (nameNodeHost == null)
+ throw new ManifoldCFException("Namenodehost must be specified");
+
+ String nameNodePort = params.getParameter(ParameterEnum.namenodeport.name());
+ if (nameNodePort == null)
+ throw new ManifoldCFException("Namenodeport must be specified");
+
+ String user = params.getParameter(ParameterEnum.user.name());
+ if (user == null)
+ throw new ManifoldCFException("User must be specified");
+
+ String nameNode = "hdfs://"+nameNodeHost+":"+nameNodePort;
+ //System.out.println("Namenode = '"+nameNode+"'");
+ /*
+ * make Configuration
+ */
+ Configuration config = null;
+ ClassLoader ocl = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(org.apache.hadoop.conf.Configuration.class.getClassLoader());
+ config = new Configuration();
+ config.set("fs.default.name", nameNode);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ocl);
+ }
+
+ /*
+ * get connection to HDFS
+ */
+ GetSessionThread t = new GetSessionThread(nameNode,config,user);
+ try {
+ t.start();
+ t.finishUp();
+ } catch (InterruptedException e) {
+ t.interrupt();
+ throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+ } catch (java.net.SocketTimeoutException e) {
+ handleIOException(e);
+ } catch (InterruptedIOException e) {
+ t.interrupt();
+ handleIOException(e);
+ } catch (URISyntaxException e) {
+ handleURISyntaxException(e);
+ } catch (IOException e) {
+ handleIOException(e);
+ }
+
+ session = t.getResult();
+ }
+ lastSessionFetch = System.currentTimeMillis();
+ return session;
}
/** Test the connection. Returns a string describing the connection integrity.
@@ -178,10 +228,12 @@ public class HDFSOutputConnector extends
@Override
public String check() throws ManifoldCFException {
try {
- getSession();
+ checkConnection();
return super.check();
} catch (ServiceInterruption e) {
- return "Transient error: "+e.getMessage();
+ return "Connection temporarily failed: " + e.getMessage();
+ } catch (ManifoldCFException e) {
+ return "Connection failed: " + e.getMessage();
}
}
@@ -218,17 +270,9 @@ public class HDFSOutputConnector extends
*/
@Override
public int addOrReplaceDocument(String documentURI, String outputDescription, RepositoryDocument document, String authorityNameString, IOutputAddActivity activities) throws ManifoldCFException, ServiceInterruption {
- // Establish a session
- getSession();
- HDFSOutputConfig config = getConfigParameters(null);
-
- HDFSOutputSpecs specs = null;
- InputStream input = null;
- FSDataOutputStream output = null;
- FileLock lock = null;
try {
- specs = new HDFSOutputSpecs(outputDescription);
+ HDFSOutputSpecs specs = new HDFSOutputSpecs(outputDescription);
/*
* make file path
@@ -241,54 +285,18 @@ public class HDFSOutputConnector extends
strBuff.append(documentURItoFilePath(documentURI));
Path path = new Path(strBuff.toString());
- /*
- * make directory
- */
- if (!fileSystem.exists(path.getParent())) {
- fileSystem.mkdirs(path.getParent());
- }
-
- /*
- * delete old file
- */
- if (fileSystem.exists(path)) {
- fileSystem.delete(path, true);
- }
-
- input = document.getBinaryStream();
- output = fileSystem.create(path);
-
- /*
- * write file
- */
- byte buf[] = new byte[65536];
- int len;
- while((len = input.read(buf)) != -1) {
- output.write(buf, 0, len);
- }
- output.flush();
+ Long startTime = new Long(System.currentTimeMillis());
+ createFile(path, document.getBinaryStream());
+ activities.recordActivity(startTime, INGEST_ACTIVITY, new Long(document.getBinaryLength()), documentURI, "OK", null);
+ return DOCUMENTSTATUS_ACCEPTED;
} catch (JSONException e) {
handleJSONException(e);
return DOCUMENTSTATUS_REJECTED;
} catch (URISyntaxException e) {
handleURISyntaxException(e);
return DOCUMENTSTATUS_REJECTED;
- } catch (IOException e) {
- handleIOException(e);
- return DOCUMENTSTATUS_REJECTED;
- } finally {
- try {
- input.close();
- } catch (IOException e) {
- }
- try {
- output.close();
- } catch (IOException e) {
- }
}
- activities.recordActivity(null, INGEST_ACTIVITY, new Long(document.getBinaryLength()), documentURI, "OK", null);
- return DOCUMENTSTATUS_ACCEPTED;
}
/** Remove a document using the connector.
@@ -300,14 +308,9 @@ public class HDFSOutputConnector extends
*/
@Override
public void removeDocument(String documentURI, String outputDescription, IOutputRemoveActivity activities) throws ManifoldCFException, ServiceInterruption {
- // Establish a session
- getSession();
- HDFSOutputConfig config = getConfigParameters(null);
-
- HDFSOutputSpecs specs = null;
try {
- specs = new HDFSOutputSpecs(outputDescription);
+ HDFSOutputSpecs specs = new HDFSOutputSpecs(outputDescription);
/*
* make path
@@ -319,22 +322,14 @@ public class HDFSOutputConnector extends
strBuff.append("/");
strBuff.append(documentURItoFilePath(documentURI));
Path path = new Path(strBuff.toString());
-
- /*
- * delete old file
- */
- if (fileSystem.exists(path)) {
- fileSystem.delete(path, true);
- }
+ Long startTime = new Long(System.currentTimeMillis());
+ deleteFile(path);
+ activities.recordActivity(startTime, REMOVE_ACTIVITY, null, documentURI, "OK", null);
} catch (JSONException e) {
handleJSONException(e);
} catch (URISyntaxException e) {
handleURISyntaxException(e);
- } catch (IOException e) {
- handleIOException(e);
}
-
- activities.recordActivity(null, REMOVE_ACTIVITY, null, documentURI, "OK", null);
}
/** Output the configuration header section.
@@ -437,7 +432,7 @@ public class HDFSOutputConnector extends
ConfigurationNode specNode = getSpecNode(os);
boolean bAdd = (specNode == null);
if (bAdd) {
- specNode = new SpecificationNode(HDFSOutputConstant.PARAM_ROOTPATH);
+ specNode = new SpecificationNode(ParameterEnum.rootpath.name());
}
HDFSOutputSpecs.contextToSpecNode(variableContext, specNode);
if (bAdd) {
@@ -467,7 +462,7 @@ public class HDFSOutputConnector extends
int l = os.getChildCount();
for (int i = 0; i < l; i++) {
SpecificationNode node = os.getChild(i);
- if (node.getType().equals(HDFSOutputConstant.PARAM_ROOTPATH)) {
+ if (node.getType().equals(ParameterEnum.rootpath.name())) {
return node;
}
}
@@ -591,7 +586,222 @@ public class HDFSOutputConnector extends
throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
}
long currentTime = System.currentTimeMillis();
+ Logging.agents.warn("HDFS output connection: IO exception: "+e.getMessage(),e);
throw new ServiceInterruption("IO exception: "+e.getMessage(), e, currentTime + 300000L, currentTime + 3 * 60 * 60000L,-1,false);
}
-
+
+ protected static class CreateFileThread extends Thread {
+ protected final HDFSSession session;
+ protected final Path path;
+ protected final InputStream input;
+ protected Throwable exception = null;
+
+ public CreateFileThread(HDFSSession session, Path path, InputStream input) {
+ super();
+ this.session = session;
+ this.path = path;
+ this.input = input;
+ setDaemon(true);
+ }
+
+ public void run() {
+ try {
+ session.createFile(path,input);
+ } catch (Throwable e) {
+ this.exception = e;
+ }
+ }
+
+ public void finishUp() throws InterruptedException, IOException {
+ join();
+ Throwable thr = exception;
+ if (thr != null) {
+ if (thr instanceof IOException) {
+ throw (IOException) thr;
+ } else if (thr instanceof RuntimeException) {
+ throw (RuntimeException) thr;
+ } else {
+ throw (Error) thr;
+ }
+ }
+ }
+ }
+
+ protected void createFile(Path path, InputStream input)
+ throws ManifoldCFException, ServiceInterruption {
+ CreateFileThread t = new CreateFileThread(getSession(), path, input);
+ try {
+ t.start();
+ t.finishUp();
+ } catch (InterruptedException e) {
+ t.interrupt();
+ throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED);
+ } catch (java.net.SocketTimeoutException e) {
+ handleIOException(e);
+ } catch (InterruptedIOException e) {
+ t.interrupt();
+ handleIOException(e);
+ } catch (IOException e) {
+ handleIOException(e);
+ }
+ }
+
+ protected static class DeleteFileThread extends Thread {
+ protected final HDFSSession session;
+ protected final Path path;
+ protected Throwable exception = null;
+
+ public DeleteFileThread(HDFSSession session, Path path) {
+ super();
+ this.session = session;
+ this.path = path;
+ setDaemon(true);
+ }
+
+ public void run() {
+ try {
+ session.deleteFile(path);
+ } catch (Throwable e) {
+ this.exception = e;
+ }
+ }
+
+ public void finishUp() throws InterruptedException, IOException {
+ join();
+ Throwable thr = exception;
+ if (thr != null) {
+ if (thr instanceof IOException) {
+ throw (IOException) thr;
+ } else if (thr instanceof RuntimeException) {
+ throw (RuntimeException) thr;
+ } else {
+ throw (Error) thr;
+ }
+ }
+ }
+ }
+
+ protected void deleteFile(Path path)
+ throws ManifoldCFException, ServiceInterruption {
+ // Establish a session
+ DeleteFileThread t = new DeleteFileThread(getSession(),path);
+ try {
+ t.start();
+ t.finishUp();
+ } catch (InterruptedException e) {
+ t.interrupt();
+ throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED);
+ } catch (java.net.SocketTimeoutException e) {
+ handleIOException(e);
+ } catch (InterruptedIOException e) {
+ t.interrupt();
+ handleIOException(e);
+ } catch (IOException e) {
+ handleIOException(e);
+ }
+ }
+
+
+ protected static class CheckConnectionThread extends Thread {
+ protected final HDFSSession session;
+ protected Throwable exception = null;
+
+ public CheckConnectionThread(HDFSSession session) {
+ super();
+ this.session = session;
+ setDaemon(true);
+ }
+
+ public void run() {
+ try {
+ session.getRepositoryInfo();
+ } catch (Throwable e) {
+ this.exception = e;
+ }
+ }
+
+ public void finishUp() throws InterruptedException, IOException {
+ join();
+ Throwable thr = exception;
+ if (thr != null) {
+ if (thr instanceof IOException) {
+ throw (IOException) thr;
+ } else if (thr instanceof RuntimeException) {
+ throw (RuntimeException) thr;
+ } else {
+ throw (Error) thr;
+ }
+ }
+ }
+ }
+
+ /**
+ * @throws ManifoldCFException
+ * @throws ServiceInterruption
+ */
+ protected void checkConnection() throws ManifoldCFException, ServiceInterruption {
+ CheckConnectionThread t = new CheckConnectionThread(getSession());
+ try {
+ t.start();
+ t.finishUp();
+ return;
+ } catch (InterruptedException e) {
+ t.interrupt();
+ throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+ } catch (java.net.SocketTimeoutException e) {
+ handleIOException(e);
+ } catch (InterruptedIOException e) {
+ t.interrupt();
+ handleIOException(e);
+ } catch (IOException e) {
+ handleIOException(e);
+ }
+ }
+
+ protected static class GetSessionThread extends Thread {
+ protected final String nameNode;
+ protected final Configuration config;
+ protected final String user;
+ protected Throwable exception = null;
+ protected HDFSSession session = null;
+
+ public GetSessionThread(String nameNode, Configuration config, String user) {
+ super();
+ this.nameNode = nameNode;
+ this.config = config;
+ this.user = user;
+ setDaemon(true);
+ }
+
+ public void run() {
+ try {
+ // Create a session
+ session = new HDFSSession(nameNode, config, user);
+ } catch (Throwable e) {
+ this.exception = e;
+ }
+ }
+
+ public void finishUp()
+ throws InterruptedException, IOException, URISyntaxException {
+ join();
+ Throwable thr = exception;
+ if (thr != null) {
+ if (thr instanceof IOException) {
+ throw (IOException) thr;
+ } else if (thr instanceof URISyntaxException) {
+ throw (URISyntaxException) thr;
+ } else if (thr instanceof RuntimeException) {
+ throw (RuntimeException) thr;
+ } else {
+ throw (Error) thr;
+ }
+ }
+ }
+
+ public HDFSSession getResult() {
+ return session;
+ }
+ }
+
}
Modified: manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputSpecs.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputSpecs.java?rev=1499023&r1=1499022&r2=1499023&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputSpecs.java (original)
+++ manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputSpecs.java Tue Jul 2 17:08:35 2013
@@ -39,7 +39,7 @@ public class HDFSOutputSpecs extends HDF
private static final long serialVersionUID = 1145652730572662025L;
final public static ParameterEnum[] SPECIFICATIONLIST = {
- ParameterEnum.ROOTPATH
+ ParameterEnum.rootpath
};
private String rootPath;
@@ -118,7 +118,7 @@ public class HDFSOutputSpecs extends HDF
* @return
*/
public String getRootPath() {
- return get(ParameterEnum.ROOTPATH);
+ return get(ParameterEnum.rootpath);
}
/**
@@ -142,7 +142,7 @@ public class HDFSOutputSpecs extends HDF
}
return set;
} catch (IOException e) {
- throw new ManifoldCFException(e);
+ throw new ManifoldCFException(e.getMessage(),e);
} finally {
if (br != null) {
IOUtils.closeQuietly(br);
Added: manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSSession.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSSession.java?rev=1499023&view=auto
==============================================================================
--- manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSSession.java (added)
+++ manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSSession.java Tue Jul 2 17:08:35 2013
@@ -0,0 +1,119 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.apache.manifoldcf.agents.output.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.manifoldcf.core.common.*;
+
+import java.io.InputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+import java.util.HashMap;
+
+/**
+ */
+public class HDFSSession {
+
+ private FileSystem fileSystem;
+ private final String nameNode;
+ private final Configuration config;
+ private final String user;
+
+ public HDFSSession(String nameNode, Configuration config, String user) throws URISyntaxException, IOException, InterruptedException {
+ this.nameNode = nameNode;
+ this.config = config;
+ this.user = user;
+ fileSystem = FileSystem.get(new URI(nameNode), config, user);
+ }
+
+ public Map<String, String> getRepositoryInfo() {
+ Map<String, String> info = new HashMap<String, String>();
+
+ info.put("Name Node", nameNode);
+ info.put("Config", config.toString());
+ info.put("User", user);
+ info.put("Canonical Service Name", fileSystem.getCanonicalServiceName());
+ info.put("Default Block Size", Long.toString(fileSystem.getDefaultBlockSize()));
+ info.put("Default Replication", Short.toString(fileSystem.getDefaultReplication()));
+ info.put("Home Directory", fileSystem.getHomeDirectory().toUri().toString());
+ info.put("Working Directory", fileSystem.getWorkingDirectory().toUri().toString());
+ return info;
+ }
+
+ public void deleteFile(Path path)
+ throws IOException {
+ if (fileSystem.exists(path)) {
+ fileSystem.delete(path, true);
+ }
+ }
+
+ public void createFile(Path path, InputStream input)
+ throws IOException {
+ /*
+ * make directory
+ */
+ if (!fileSystem.exists(path.getParent())) {
+ fileSystem.mkdirs(path.getParent());
+ }
+
+ /*
+ * delete old file
+ */
+ if (fileSystem.exists(path)) {
+ fileSystem.delete(path, true);
+ }
+
+ FSDataOutputStream output = fileSystem.create(path);
+ try {
+ /*
+ * write file
+ */
+ byte buf[] = new byte[65536];
+ int len;
+ while((len = input.read(buf)) != -1) {
+ output.write(buf, 0, len);
+ }
+ output.flush();
+ } finally {
+ output.close();
+ }
+
+ // Do NOT close input; it's closed by the caller.
+ }
+
+ public URI getUri() {
+ return fileSystem.getUri();
+ }
+
+ public void close() throws IOException {
+ fileSystem.close();
+ }
+}
Propchange: manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSSession.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSSession.java
------------------------------------------------------------------------------
svn:keywords = Id
Modified: manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/ParameterEnum.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/ParameterEnum.java?rev=1499023&r1=1499022&r2=1499023&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/ParameterEnum.java (original)
+++ manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/ParameterEnum.java Tue Jul 2 17:08:35 2013
@@ -24,10 +24,10 @@ import java.util.Map;
/** Parameters constants */
public enum ParameterEnum {
- NAMENODEHOST("localhost"),
- NAMENODEPORT("9000"),
- USER(""),
- ROOTPATH("");
+ namenodehost("localhost"),
+ namenodeport("9000"),
+ user(""),
+ rootpath("");
final protected String defaultValue;
Modified: manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSRepositoryConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSRepositoryConnector.java?rev=1499023&r1=1499022&r2=1499023&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSRepositoryConnector.java (original)
+++ manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSRepositoryConnector.java Tue Jul 2 17:08:35 2013
@@ -57,7 +57,6 @@ public class HDFSRepositoryConnector ext
protected String nameNodeHost = null;
protected String nameNodePort = null;
protected String user = null;
- protected Configuration config = null;
protected HDFSSession session = null;
protected long lastSessionFetch = -1L;
protected static final long timeToRelease = 300000L;
@@ -126,17 +125,6 @@ public class HDFSRepositoryConnector ext
nameNodePort = configParams.getParameter("namenodeport");
user = configParams.getParameter("user");
- /*
- * make Configuration
- */
- ClassLoader ocl = Thread.currentThread().getContextClassLoader();
- try {
- Thread.currentThread().setContextClassLoader(org.apache.hadoop.conf.Configuration.class.getClassLoader());
- config = new Configuration();
- config.set("fs.default.name", makeNameNodeURI(nameNodeHost, nameNodePort));
- } finally {
- Thread.currentThread().setContextClassLoader(ocl);
- }
}
/* (non-Javadoc)
@@ -145,21 +133,16 @@ public class HDFSRepositoryConnector ext
@Override
public void disconnect() throws ManifoldCFException {
closeSession();
- config = null;
user = null;
nameNodeHost = null;
nameNodePort = null;
super.disconnect();
}
- protected static String makeNameNodeURI(String host, String port) {
- return "hdfs://"+host+":"+port;
- }
-
/**
* Set up a session
*/
- protected void getSession() throws ManifoldCFException, ServiceInterruption {
+ protected HDFSSession getSession() throws ManifoldCFException, ServiceInterruption {
if (session == null) {
if (StringUtils.isEmpty(nameNodeHost)) {
throw new ManifoldCFException("Parameter namenodehost required but not set");
@@ -180,42 +163,43 @@ public class HDFSRepositoryConnector ext
if (Logging.connectors.isDebugEnabled()) {
Logging.connectors.debug("HDFS: User = '" + user + "'");
}
+
+ String nameNode = "hdfs://"+nameNodeHost+":"+nameNodePort;
+
+ /*
+ * make Configuration
+ */
+ Configuration config = null;
+ ClassLoader ocl = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(org.apache.hadoop.conf.Configuration.class.getClassLoader());
+ config = new Configuration();
+ config.set("fs.default.name", nameNode);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ocl);
+ }
- long currentTime;
- GetSessionThread t = new GetSessionThread();
+ GetSessionThread t = new GetSessionThread(nameNode,config,user);
try {
t.start();
- t.join();
- Throwable thr = t.getException();
- if (thr != null) {
- if (thr instanceof IOException) {
- throw (IOException) thr;
- } else if (thr instanceof URISyntaxException) {
- throw (URISyntaxException) thr;
- } else if (thr instanceof RuntimeException) {
- throw (RuntimeException) thr;
- } else {
- throw (Error) thr;
- }
- }
+ t.finishUp();
} catch (InterruptedException e) {
t.interrupt();
throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
} catch (java.net.SocketTimeoutException e) {
- Logging.connectors.warn("HDFS: Socket timeout: " + e.getMessage(), e);
handleIOException(e);
} catch (InterruptedIOException e) {
t.interrupt();
- throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+ handleIOException(e);
} catch (URISyntaxException e) {
- Logging.connectors.error("HDFS: URI syntax exception: " + e.getMessage(), e);
handleURISyntaxException(e);
} catch (IOException e) {
- Logging.connectors.warn("HDFS: IO error: " + e.getMessage(), e);
handleIOException(e);
}
+ session = t.getResult();
}
lastSessionFetch = System.currentTimeMillis();
+ return session;
}
/**
@@ -238,32 +222,6 @@ public class HDFSRepositoryConnector ext
/**
* @throws ManifoldCFException
- * @throws ServiceInterruption
- */
- protected void checkConnection() throws ManifoldCFException, ServiceInterruption {
- getSession();
- CheckConnectionThread t = new CheckConnectionThread();
- try {
- t.start();
- t.finishUp();
- return;
- } catch (InterruptedException e) {
- t.interrupt();
- throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
- } catch (java.net.SocketTimeoutException e) {
- Logging.connectors.warn("HDFS: Socket timeout: " + e.getMessage(), e);
- handleIOException(e);
- } catch (InterruptedIOException e) {
- t.interrupt();
- throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
- } catch (IOException e) {
- Logging.connectors.warn("HDFS: Error checking repository: " + e.getMessage(), e);
- handleIOException(e);
- }
- }
-
- /**
- * @throws ManifoldCFException
*/
@Override
public void poll() throws ManifoldCFException {
@@ -500,9 +458,6 @@ public class HDFSRepositoryConnector ext
}
data.addField("uri",uri);
- // Make sure we have a session
- getSession();
-
// We will record document fetch as an activity
long startTime = System.currentTimeMillis();
String errorCode = "FAILED";
@@ -510,7 +465,7 @@ public class HDFSRepositoryConnector ext
long fileSize = 0;
try {
- BackgroundStreamThread t = new BackgroundStreamThread(new Path(documentIdentifier));
+ BackgroundStreamThread t = new BackgroundStreamThread(getSession(),new Path(documentIdentifier));
try {
t.start();
boolean wasInterrupted = false;
@@ -1661,6 +1616,7 @@ public class HDFSRepositoryConnector ext
if (!(e instanceof java.net.SocketTimeoutException) && (e instanceof InterruptedIOException)) {
throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
}
+ Logging.connectors.warn("HDFS: IO exception: "+e.getMessage(),e);
long currentTime = System.currentTimeMillis();
throw new ServiceInterruption("IO exception: "+e.getMessage(), e, currentTime + 300000L, currentTime + 3 * 60 * 60000L,-1,false);
}
@@ -1672,14 +1628,17 @@ public class HDFSRepositoryConnector ext
*/
private static void handleURISyntaxException(URISyntaxException e) throws ManifoldCFException, ServiceInterruption {
// Permanent problem
- throw new ManifoldCFException("HDFS bad namenode specification: "+e.getMessage(), e);
+ Logging.connectors.error("HDFS: Bad namenode specification: "+e.getMessage(), e);
+ throw new ManifoldCFException("Bad namenode specification: "+e.getMessage(), e);
}
- protected class CheckConnectionThread extends Thread {
+ protected static class CheckConnectionThread extends Thread {
+ protected final HDFSSession session;
protected Throwable exception = null;
- public CheckConnectionThread() {
+ public CheckConnectionThread(HDFSSession session) {
super();
+ this.session = session;
setDaemon(true);
}
@@ -1706,41 +1665,90 @@ public class HDFSRepositoryConnector ext
}
}
- protected class GetSessionThread extends Thread {
+ /**
+ * @throws ManifoldCFException
+ * @throws ServiceInterruption
+ */
+ protected void checkConnection() throws ManifoldCFException, ServiceInterruption {
+ CheckConnectionThread t = new CheckConnectionThread(getSession());
+ try {
+ t.start();
+ t.finishUp();
+ return;
+ } catch (InterruptedException e) {
+ t.interrupt();
+ throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+ } catch (java.net.SocketTimeoutException e) {
+ handleIOException(e);
+ } catch (InterruptedIOException e) {
+ t.interrupt();
+ throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+ } catch (IOException e) {
+ handleIOException(e);
+ }
+ }
+
+ protected static class GetSessionThread extends Thread {
+ protected final String nameNode;
+ protected final Configuration config;
+ protected final String user;
protected Throwable exception = null;
+ protected HDFSSession session;
- public GetSessionThread() {
+ public GetSessionThread(String nameNode, Configuration config, String user) {
super();
+ this.nameNode = nameNode;
+ this.config = config;
+ this.user = user;
setDaemon(true);
}
public void run() {
try {
// Create a session
- session = new HDFSSession(makeNameNodeURI(nameNodeHost,nameNodePort), config, user);
+ session = new HDFSSession(nameNode, config, user);
} catch (Throwable e) {
this.exception = e;
}
}
- public Throwable getException() {
- return exception;
+ public void finishUp()
+ throws InterruptedException, IOException, URISyntaxException {
+ join();
+ Throwable thr = exception;
+ if (thr != null) {
+ if (thr instanceof IOException) {
+ throw (IOException) thr;
+ } else if (thr instanceof URISyntaxException) {
+ throw (URISyntaxException) thr;
+ } else if (thr instanceof RuntimeException) {
+ throw (RuntimeException) thr;
+ } else {
+ throw (Error) thr;
+ }
+ }
+ }
+
+ public HDFSSession getResult() {
+ return session;
}
}
protected FileStatus[] getChildren(Path path)
throws ManifoldCFException, ServiceInterruption {
- getSession();
+ GetChildrenThread t = new GetChildrenThread(getSession(), path);
try {
- GetChildrenThread t = new GetChildrenThread(path);
- try {
- t.start();
- t.finishUp();
- } catch (InterruptedException e) {
- t.interrupt();
- throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
- }
+ t.start();
+ t.finishUp();
return t.getResult();
+ } catch (InterruptedException e) {
+ t.interrupt();
+ throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+ } catch (java.net.SocketTimeoutException e) {
+ handleIOException(e);
+ } catch (InterruptedIOException e) {
+ t.interrupt();
+ handleIOException(e);
} catch (IOException e) {
handleIOException(e);
}
@@ -1750,10 +1758,12 @@ public class HDFSRepositoryConnector ext
protected class GetChildrenThread extends Thread {
protected Throwable exception = null;
protected FileStatus[] result = null;
+ protected final HDFSSession session;
protected final Path path;
- public GetChildrenThread(Path path) {
+ public GetChildrenThread(HDFSSession session, Path path) {
super();
+ this.session = session;
this.path = path;
setDaemon(true);
}
@@ -1790,32 +1800,35 @@ public class HDFSRepositoryConnector ext
protected FileStatus getObject(Path path)
throws ManifoldCFException, ServiceInterruption {
- getSession();
+ GetObjectThread objt = new GetObjectThread(getSession(),path);
try {
- GetObjectThread objt = new GetObjectThread(path);
- try {
- objt.start();
- objt.finishUp();
- } catch (InterruptedException e) {
- objt.interrupt();
- throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
- }
-
+ objt.start();
+ objt.finishUp();
return objt.getResponse();
+ } catch (InterruptedException e) {
+ objt.interrupt();
+ throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+ } catch (java.net.SocketTimeoutException e) {
+ handleIOException(e);
+ } catch (InterruptedIOException e) {
+ objt.interrupt();
+ handleIOException(e);
} catch (IOException e) {
handleIOException(e);
}
return null;
}
- protected class GetObjectThread extends Thread {
+ protected static class GetObjectThread extends Thread {
+ protected final HDFSSession session;
protected final Path nodeId;
protected Throwable exception = null;
protected FileStatus response = null;
- public GetObjectThread(Path nodeId) {
+ public GetObjectThread(HDFSSession session, Path nodeId) {
super();
setDaemon(true);
+ this.session = session;
this.nodeId = nodeId;
}
@@ -1849,8 +1862,9 @@ public class HDFSRepositoryConnector ext
}
- protected class BackgroundStreamThread extends Thread
+ protected static class BackgroundStreamThread extends Thread
{
+ protected final HDFSSession session;
protected final Path nodeId;
protected boolean abortThread = false;
@@ -1858,10 +1872,11 @@ public class HDFSRepositoryConnector ext
protected InputStream sourceStream = null;
protected XThreadInputStream threadStream = null;
- public BackgroundStreamThread(Path nodeId)
+ public BackgroundStreamThread(HDFSSession session, Path nodeId)
{
super();
setDaemon(true);
+ this.session = session;
this.nodeId = nodeId;
}