You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by mi...@apache.org on 2013/06/25 10:23:03 UTC
svn commit: r1496377 - in
/manifoldcf/branches/CONNECTORS-728/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf:
agents/output/hdfs/ crawler/connectors/hdfs/
Author: minoru
Date: Tue Jun 25 08:23:03 2013
New Revision: 1496377
URL: http://svn.apache.org/r1496377
Log: (empty)
Added:
manifoldcf/branches/CONNECTORS-728/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSSession.java (with props)
Modified:
manifoldcf/branches/CONNECTORS-728/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputConnector.java
manifoldcf/branches/CONNECTORS-728/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSRepositoryConnector.java
Modified: manifoldcf/branches/CONNECTORS-728/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-728/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputConnector.java?rev=1496377&r1=1496376&r2=1496377&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-728/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputConnector.java (original)
+++ manifoldcf/branches/CONNECTORS-728/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/agents/output/hdfs/HDFSOutputConnector.java Tue Jun 25 08:23:03 2013
@@ -44,6 +44,7 @@ import org.apache.manifoldcf.agents.inte
import org.apache.manifoldcf.agents.interfaces.OutputSpecification;
import org.apache.manifoldcf.agents.interfaces.RepositoryDocument;
import org.apache.manifoldcf.agents.interfaces.ServiceInterruption;
+import org.apache.manifoldcf.agents.system.Logging;
import org.apache.manifoldcf.agents.output.BaseOutputConnector;
import org.apache.manifoldcf.agents.output.hdfs.HDFSOutputParam.ParameterEnum;
import org.apache.manifoldcf.core.interfaces.ConfigParams;
@@ -134,11 +135,11 @@ public class HDFSOutputConnector extends
try {
fileSystem = FileSystem.get(new URI(nameNode), config, user);
} catch (URISyntaxException e) {
- Logging.connectors.warn("HDFS: Node name error: " + e.getMessage(), e);
+ Logging.agents.warn("HDFS: Node name error: " + e.getMessage(), e);
} catch (IOException e) {
- Logging.connectors.warn("HDFS: File system error: " + e.getMessage(), e);
+ Logging.agents.warn("HDFS: File system error: " + e.getMessage(), e);
} catch (InterruptedException e) {
- Logging.connectors.warn("HDFS: File system error: " + e.getMessage(), e);
+ Logging.agents.warn("HDFS: File system error: " + e.getMessage(), e);
}
}
Modified: manifoldcf/branches/CONNECTORS-728/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSRepositoryConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-728/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSRepositoryConnector.java?rev=1496377&r1=1496376&r2=1496377&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-728/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSRepositoryConnector.java (original)
+++ manifoldcf/branches/CONNECTORS-728/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSRepositoryConnector.java Tue Jun 25 08:23:03 2013
@@ -18,16 +18,24 @@
*/
package org.apache.manifoldcf.crawler.connectors.hdfs;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.manifoldcf.agents.interfaces.RepositoryDocument;
+import org.apache.manifoldcf.agents.interfaces.ServiceInterruption;
import org.apache.manifoldcf.core.interfaces.*;
-import org.apache.manifoldcf.agents.interfaces.*;
+import org.apache.manifoldcf.crawler.connectors.hdfs.HDFSSession;
+import org.apache.manifoldcf.crawler.connectors.hdfs.Messages;
import org.apache.manifoldcf.crawler.interfaces.*;
import org.apache.manifoldcf.crawler.system.Logging;
+import org.apache.manifoldcf.core.common.XThreadInputStream;
+import org.apache.manifoldcf.core.common.XThreadStringBuffer;
import org.apache.manifoldcf.core.extmimemap.ExtensionMimeMap;
+
+import java.security.GeneralSecurityException;
import java.util.*;
import java.io.*;
import java.net.URI;
@@ -55,8 +63,16 @@ public class HDFSRepositoryConnector ext
// Local data
// protected File rootDirectory = null;
+ // Nodes and attributes
+ //private static final String JOB_STARTPOINT_NODE_TYPE = "startpoint";
+ //private static final String JOB_PATH_ATTRIBUTE = "path";
+
+ protected String nameNode = null;
+ protected String user = null;
protected Configuration config = null;
- protected FileSystem fileSystem = null;
+ protected HDFSSession session = null;
+ protected long lastSessionFetch = -1L;
+ protected static final long timeToRelease = 300000L;
/*
* Constructor.
@@ -65,16 +81,62 @@ public class HDFSRepositoryConnector ext
{
}
- /* (non-Javadoc)
+ /** Tell the world what model this connector uses for getDocumentIdentifiers().
+ * This must return a model value as specified above.
+ *@return the model type value.
+ */
+ @Override
+ public int getConnectorModel()
+ {
+ return MODEL_CHAINED_ADD_CHANGE;
+ }
+
+/** Return the list of relationship types that this connector recognizes.
+ *@return the list.
+ */
+ @Override
+ public String[] getRelationshipTypes()
+ {
+ return new String[]{RELATIONSHIP_CHILD};
+ }
+
+ /** List the activities we might report on.
+ */
+ @Override
+ public String[] getActivitiesList()
+ {
+ return activitiesList;
+ }
+
+ /** For any given document, list the bins that it is a member of.
+ */
+ @Override
+ public String[] getBinNames(String documentIdentifier)
+ {
+ return new String[]{"HDFS"};
+ }
+
+ /**
+ * Get the maximum number of documents to amalgamate together into one
+ * batch, for this connector.
+ *
+ * @return the maximum number. 0 indicates "unlimited".
+ */
+ @Override
+ public int getMaxDocumentRequest() {
+ return 1;
+ }
+
+/* (non-Javadoc)
* @see org.apache.manifoldcf.core.connector.BaseConnector#connect(org.apache.manifoldcf.core.interfaces.ConfigParams)
*/
@Override
public void connect(ConfigParams configParams) {
super.connect(configParams);
- String nameNode = configParams.getParameter("namenode");
+ nameNode = configParams.getParameter("namenode");
- String user = configParams.getParameter("user");
+ user = configParams.getParameter("user");
/*
* make Configuration
@@ -87,170 +149,248 @@ public class HDFSRepositoryConnector ext
} finally {
Thread.currentThread().setContextClassLoader(ocl);
}
-
- /*
- * get connection to HDFS
- */
- try {
- fileSystem = FileSystem.get(new URI(nameNode), config, user);
- } catch (URISyntaxException e) {
- Logging.connectors.warn("HDFS: Node name error: " + e.getMessage(), e);
- } catch (IOException e) {
- Logging.connectors.warn("HDFS: File system error: " + e.getMessage(), e);
- } catch (InterruptedException e) {
- Logging.connectors.warn("HDFS: File system error: " + e.getMessage(), e);
- }
}
- /* (non-Javadoc)
+/* (non-Javadoc)
* @see org.apache.manifoldcf.core.connector.BaseConnector#disconnect()
*/
@Override
public void disconnect() throws ManifoldCFException {
- try {
- fileSystem.close();
- } catch(IOException ex) {
- throw new ManifoldCFException(ex);
+ if (session != null) {
+ session.close();
+ session = null;
+ lastSessionFetch = -1L;
}
+
config.clear();
+ config = null;
+ user = null;
+ nameNode = null;
super.disconnect();
}
- /** Tell the world what model this connector uses for getDocumentIdentifiers().
- * This must return a model value as specified above.
- *@return the model type value.
+/**
+ * Set up a session
*/
- @Override
- public int getConnectorModel()
- {
- return MODEL_CHAINED_ADD_CHANGE;
+ protected void getSession() throws ManifoldCFException, ServiceInterruption {
+ if (session == null) {
+ if (StringUtils.isEmpty(nameNode)) {
+ throw new ManifoldCFException("Parameter namenode required but not set");
+ }
+ if (Logging.connectors.isDebugEnabled()) {
+ Logging.connectors.debug("HDFS: NameNode = '" + nameNode + "'");
+ }
+
+ if (StringUtils.isEmpty(user)) {
+ throw new ManifoldCFException("Parameter user required but not set");
+ }
+ if (Logging.connectors.isDebugEnabled()) {
+ Logging.connectors.debug("HDFS: User = '" + user + "'");
+ }
+
+ long currentTime;
+ GetSessionThread t = new GetSessionThread();
+ try {
+ t.start();
+ t.join();
+ Throwable thr = t.getException();
+ if (thr != null) {
+ if (thr instanceof IOException) {
+ throw (IOException) thr;
+ } else if (thr instanceof GeneralSecurityException) {
+ throw (GeneralSecurityException) thr;
+ } else {
+ throw (Error) thr;
+ }
+ }
+ } catch (InterruptedException e) {
+ t.interrupt();
+ throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+ } catch (java.net.SocketTimeoutException e) {
+ Logging.connectors.warn("HDFS: Socket timeout: " + e.getMessage(), e);
+ handleIOException(e);
+ } catch (InterruptedIOException e) {
+ t.interrupt();
+ throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+ } catch (GeneralSecurityException e) {
+ Logging.connectors.error("HDFS: " + "General security error initializing transport: " + e.getMessage(), e);
+ handleGeneralSecurityException(e);
+ } catch (IOException e) {
+ Logging.connectors.warn("HDFS: IO error: " + e.getMessage(), e);
+ handleIOException(e);
+ }
+ }
+ lastSessionFetch = System.currentTimeMillis();
}
- /** Return the list of relationship types that this connector recognizes.
- *@return the list.
+/**
+ * Test the connection. Returns a string describing the connection
+ * integrity.
+ *
+ * @return the connection's status as a displayable string.
*/
@Override
- public String[] getRelationshipTypes()
- {
- return new String[]{RELATIONSHIP_CHILD};
+ public String check() throws ManifoldCFException {
+ try {
+ checkConnection();
+ return super.check();
+ } catch (ServiceInterruption e) {
+ return "Connection temporarily failed: " + e.getMessage();
+ } catch (ManifoldCFException e) {
+ return "Connection failed: " + e.getMessage();
+ }
}
- /** List the activities we might report on.
+ /**
+ * @throws ManifoldCFException
+ * @throws ServiceInterruption
*/
- @Override
- public String[] getActivitiesList()
- {
- return activitiesList;
+ protected void checkConnection() throws ManifoldCFException, ServiceInterruption {
+ getSession();
+ CheckConnectionThread t = new CheckConnectionThread();
+ try {
+ t.start();
+ t.join();
+ Throwable thr = t.getException();
+ if (thr != null) {
+ if (thr instanceof IOException) {
+ throw (IOException) thr;
+ } else if (thr instanceof RuntimeException) {
+ throw (RuntimeException) thr;
+ } else {
+ throw (Error) thr;
+ }
+ }
+ return;
+ } catch (InterruptedException e) {
+ t.interrupt();
+ throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+ } catch (java.net.SocketTimeoutException e) {
+ Logging.connectors.warn("HDFS: Socket timeout: " + e.getMessage(), e);
+ handleIOException(e);
+ } catch (InterruptedIOException e) {
+ t.interrupt();
+ throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+ } catch (IOException e) {
+ Logging.connectors.warn("HDFS: Error checking repository: " + e.getMessage(), e);
+ handleIOException(e);
+ }
}
- /** For any given document, list the bins that it is a member of.
+ /**
+ * @throws ManifoldCFException
*/
@Override
- public String[] getBinNames(String documentIdentifier)
- {
- /*
- // Note: This code is for testing, so we can see how documents behave when they are in various kinds of bin situations.
- // The testing model is that there are documents belonging to "SLOW", to "FAST", or both to "SLOW" and "FAST" bins.
- // The connector chooses which bins to assign a document to based on the identifier (which is the document's path), so
- // this is something that should NOT be duplicated by other connector implementers.
- if (documentIdentifier.indexOf("/BOTH/") != -1 || (documentIdentifier.indexOf("/SLOW/") != -1 && documentIdentifier.indexOf("/FAST/") != -1))
- return new String[]{"SLOW","FAST"};
- if (documentIdentifier.indexOf("/SLOW/") != -1)
- return new String[]{"SLOW"};
- if (documentIdentifier.indexOf("/FAST/") != -1)
- return new String[]{"FAST"};
- */
- return new String[]{""};
+ public void poll() throws ManifoldCFException {
+ if (lastSessionFetch == -1L) {
+ return;
+ }
+
+ long currentTime = System.currentTimeMillis();
+ if (currentTime >= lastSessionFetch + timeToRelease) {
+ if (session != null) {
+ session.close();
+ session = null;
+ lastSessionFetch = -1L;
+ }
+ }
}
- /** Convert a document identifier to a URI. The URI is the URI that will be the unique key from
- * the search index, and will be presented to the user as part of the search results.
- *@param filePath is the document filePath.
- *@param repositoryPath is the document repositoryPath.
- *@return the document uri.
- */
- protected String convertToURI(String documentIdentifier, String[] repositoryPaths)
- throws ManifoldCFException
- {
- //
- // Note well: This MUST be a legal URI!!!
- try
- {
- String path = new Path(documentIdentifier).toString();
- for (String repositoryPath : repositoryPaths) {
- if (path.startsWith(repositoryPath)) {
- StringBuffer sb = new StringBuffer();
- path = path.replaceFirst(repositoryPath, "");
- if (path.startsWith("/")) {
- path = path.replaceFirst("/", "");
- }
- String[] tmp = path.split("/", 3);
- String scheme = "";
- String host = "";
- String other = "";
- try {
- scheme = tmp[0];
- } catch (ArrayIndexOutOfBoundsException e) {
- scheme = "hdfs";
- }
- try {
- host = tmp[1];
- } catch (ArrayIndexOutOfBoundsException e) {
- host = "localhost:9000";
- }
+ /**
+ * Queue "seed" documents. Seed documents are the starting places for
+ * crawling activity. Documents are seeded when this method calls
+ * appropriate methods in the passed in ISeedingActivity object.
+ *
+ * This method can choose to find repository changes that happen only during
+ * the specified time interval. The seeds recorded by this method will be
+ * viewed by the framework based on what the getConnectorModel() method
+ * returns.
+ *
+ * It is not a big problem if the connector chooses to create more seeds
+ * than are strictly necessary; it is merely a question of overall work
+ * required.
+ *
+ * The times passed to this method may be interpreted for greatest
+ * efficiency. The time ranges any given job uses with this connector will
+ * not overlap, but will proceed starting at 0 and going to the "current
+ * time", each time the job is run. For continuous crawling jobs, this
+ * method will be called once, when the job starts, and at various periodic
+ * intervals as the job executes.
+ *
+ * When a job's specification is changed, the framework automatically resets
+ * the seeding start time to 0. The seeding start time may also be set to 0
+ * on each job run, depending on the connector model returned by
+ * getConnectorModel().
+ *
+ * Note that it is always ok to send MORE documents rather than less to this
+ * method.
+ *
+ * @param activities is the interface this method should use to perform
+ * whatever framework actions are desired.
+ * @param spec is a document specification (that comes from the job).
+ * @param startTime is the beginning of the time range to consider,
+ * inclusive.
+ * @param endTime is the end of the time range to consider, exclusive.
+ * @param jobMode is an integer describing how the job is being run, whether
+ * continuous or once-only.
+ */
+ @Override
+ public void addSeedDocuments(ISeedingActivity activities,
+ DocumentSpecification spec, long startTime, long endTime, int jobMode)
+ throws ManifoldCFException, ServiceInterruption {
+
+ String path = StringUtils.EMPTY;
+ int i = 0;
+ while (i < spec.getChildCount()) {
+ SpecificationNode sn = spec.getChild(i);
+ if (sn.getType().equals("startpoint")) {
+ path = sn.getAttributeValue("path");
+
+ getSession();
+ GetSeedsThread t = new GetSeedsThread(path);
+ try {
+ t.start();
+ boolean wasInterrupted = false;
try {
- other = "/" + tmp[2];
- } catch (ArrayIndexOutOfBoundsException e) {
- other = "/";
+ XThreadStringBuffer seedBuffer = t.getBuffer();
+
+ // Pick up the paths, and add them to the activities, before we join with the child thread.
+ while (true) {
+ // The only kind of exceptions this can throw are going to shut the process down.
+ String docPath = seedBuffer.fetch();
+ if (docPath == null) {
+ break;
+ }
+ // Add the pageID to the queue
+ activities.addSeedDocument(docPath);
+ }
+ } catch (InterruptedException e) {
+ wasInterrupted = true;
+ throw e;
+ } catch (ManifoldCFException e) {
+ if (e.getErrorCode() == ManifoldCFException.INTERRUPTED) {
+ wasInterrupted = true;
+ }
+ throw e;
+ } finally {
+ if (!wasInterrupted) {
+ t.finishUp();
+ }
}
- return new URI(scheme + "://" + host + other).toURL().toString();
+ } catch (InterruptedException e) {
+ t.interrupt();
+ throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+ } catch (java.net.SocketTimeoutException e) {
+ Logging.connectors.warn("HDFS: Socket timeout adding seed documents: " + e.getMessage(), e);
+ handleIOException(e);
+ } catch (IOException e) {
+ Logging.connectors.warn("HDFS: Error adding seed documents: " + e.getMessage(), e);
+ handleIOException(e);
}
}
- return convertToURI(documentIdentifier);
- }
- catch (URISyntaxException e)
- {
- throw new ManifoldCFException("Bad url",e);
- }
- catch (IOException e)
- {
- throw new ManifoldCFException("Bad url",e);
+ i++;
}
}
-
- /** Convert a document identifier to a URI. The URI is the URI that will be the unique key from
- * the search index, and will be presented to the user as part of the search results.
- *@param documentIdentifier is the document identifier.
- *@return the document uri.
- */
- protected String convertToURI(String documentIdentifier)
- throws ManifoldCFException
- {
- //
- // Note well: This MUST be a legal URI!!!
- return new Path(documentIdentifier).toUri().toString();
- }
-
-
- /** Given a document specification, get either a list of starting document identifiers (seeds),
- * or a list of changes (deltas), depending on whether this is a "crawled" connector or not.
- * These document identifiers will be loaded into the job's queue at the beginning of the
- * job's execution.
- * This method can return changes only (because it is provided a time range). For full
- * recrawls, the start time is always zero.
- * Note that it is always ok to return MORE documents rather than less with this method.
- *@param spec is a document specification (that comes from the job).
- *@param startTime is the beginning of the time range to consider, inclusive.
- *@param endTime is the end of the time range to consider, exclusive.
- *@return the stream of local document identifiers that should be added to the queue.
- */
- @Override
- public IDocumentIdentifierStream getDocumentIdentifiers(DocumentSpecification spec, long startTime, long endTime)
- throws ManifoldCFException
- {
- return new IdentifierStream(spec);
- }
-
/** Get document versions given an array of document identifiers.
* This method is called for EVERY document that is considered. It is therefore important to perform
@@ -289,61 +429,48 @@ public class HDFSRepositoryConnector ext
}
String[] rval = new String[documentIdentifiers.length];
- try
- {
- i = 0;
- while (i < rval.length)
- {
- Path path = new Path(documentIdentifiers[i]);
- if (fileSystem.exists(path))
- {
- if (fileSystem.getFileStatus(path).isDir())
- {
- // It's a directory. The version ID will be the
- // last modified date.
- long lastModified = fileSystem.getFileStatus(path).getModificationTime();
+ for (i = 0; i < rval.length; i++) {
+ getSession();
+ GetObjectThread objt = new GetObjectThread(documentIdentifiers[i]);
+ try {
+ objt.start();
+ objt.finishUp();
+ } catch (InterruptedException e) {
+ objt.interrupt();
+ throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+ }
+
+ try {
+ Path path = objt.getResponse();
+ if (session.getFileSystem().exists(path)) {
+ if (session.getFileSystem().getFileStatus(path).isDir()) {
+ long lastModified = session.getFileSystem().getFileStatus(path).getModificationTime();
rval[i] = new Long(lastModified).toString();
-
- // Signal that we don't have any versioning.
- // rval[i] = "";
- }
- else
- {
- // It's a file
- long fileLength = fileSystem.getFileStatus(path).getLen();
- if (activities.checkLengthIndexable(fileLength))
- {
- // Get the file's modified date.
- long lastModified = fileSystem.getFileStatus(path).getModificationTime();
+ } else {
+ long fileLength = session.getFileSystem().getFileStatus(path).getLen();
+ if (activities.checkLengthIndexable(fileLength)) {
+ long lastModified = session.getFileSystem().getFileStatus(path).getModificationTime();
StringBuilder sb = new StringBuilder();
- if (filePathToUri)
- {
+ if (filePathToUri) {
sb.append("+");
- }
- else
- {
+ } else {
sb.append("-");
}
sb.append(new Long(lastModified).toString()).append(":").append(new Long(fileLength).toString());
rval[i] = sb.toString();
- }
- else
- {
+ } else {
rval[i] = null;
}
}
- }
- else
- {
+ } else {
rval[i] = null;
}
- i++;
+ } catch (IOException e) {
+ objt.interrupt();
+ throw new ManifoldCFException(e);
}
}
- catch (IOException e)
- {
- throw new ManifoldCFException(e);
- }
+
return rval;
}
@@ -371,8 +498,8 @@ public class HDFSRepositoryConnector ext
String version = versions[i];
String documentIdentifier = documentIdentifiers[i];
Path path = new Path(documentIdentifier);
- FileStatus fileStatus = fileSystem.getFileStatus(path);
- if (fileSystem.exists(path))
+ FileStatus fileStatus = session.getFileSystem().getFileStatus(path);
+ if (session.getFileSystem().exists(path))
{
if (fileStatus.isDir())
{
@@ -385,7 +512,7 @@ public class HDFSRepositoryConnector ext
{
try
{
- FileStatus[] fileStatuses = fileSystem.listStatus(path);
+ FileStatus[] fileStatuses = session.getFileSystem().listStatus(path);
if (fileStatuses != null)
{
int j = 0;
@@ -393,7 +520,7 @@ public class HDFSRepositoryConnector ext
{
FileStatus fs = fileStatuses[j++];
String canonicalPath = fs.getPath().toString();
- if (checkInclude(fileSystem.getUri().toString(),fs,canonicalPath,spec))
+ if (checkInclude(session.getFileSystem().getUri().toString(),fs,canonicalPath,spec))
activities.addDocumentReference(canonicalPath,documentIdentifier,RELATIONSHIP_CHILD);
}
}
@@ -416,7 +543,7 @@ public class HDFSRepositoryConnector ext
{
// We've already avoided queuing documents that we don't want, based on file specifications.
// We still need to check based on file data.
- if (checkIngest(fileSystem.getUri().toString(),fileStatus,spec))
+ if (checkIngest(session.getFileSystem().getUri().toString(),fileStatus,spec))
{
int j = 0;
@@ -431,7 +558,7 @@ public class HDFSRepositoryConnector ext
if (sn.getType().equals("startpoint"))
{
if (sn.getAttributeValue("path").length() > 0) {
- repositoryPaths.add(fileSystem.getUri().resolve(sn.getAttributeValue("path")).toString());
+ repositoryPaths.add(session.getFileSystem().getUri().resolve(sn.getAttributeValue("path")).toString());
}
}
}
@@ -454,7 +581,7 @@ public class HDFSRepositoryConnector ext
// Ingest the document.
try
{
- FSDataInputStream is = fileSystem.open(path);
+ FSDataInputStream is = session.getFileSystem().open(path);
try
{
long fileBytes = fileStatus.getLen();
@@ -504,18 +631,6 @@ public class HDFSRepositoryConnector ext
}
}
- /** Map an extension to a mime type */
- protected static String mapExtensionToMimeType(String fileName)
- {
- int slashIndex = fileName.lastIndexOf("/");
- if (slashIndex != -1)
- fileName = fileName.substring(slashIndex+1);
- int dotIndex = fileName.lastIndexOf(".");
- if (dotIndex == -1)
- return null;
- return ExtensionMimeMap.mapToMimeType(fileName.substring(dotIndex+1).toLowerCase(java.util.Locale.ROOT));
- }
-
// UI support methods.
//
// These support methods come in two varieties. The first bunch is involved in setting up connection configuration information. The second bunch
@@ -1206,7 +1321,87 @@ public class HDFSRepositoryConnector ext
// Protected static methods
- /** Check if a file or directory should be included, given a document specification.
+ /** Convert a document identifier to a URI. The URI is the URI that will be the unique key from
+ * the search index, and will be presented to the user as part of the search results.
+ *@param filePath is the document filePath.
+ *@param repositoryPath is the document repositoryPath.
+ *@return the document uri.
+ */
+ protected String convertToURI(String documentIdentifier, String[] repositoryPaths)
+ throws ManifoldCFException
+ {
+ //
+ // Note well: This MUST be a legal URI!!!
+ try
+ {
+ String path = new Path(documentIdentifier).toString();
+ for (String repositoryPath : repositoryPaths) {
+ if (path.startsWith(repositoryPath)) {
+ StringBuffer sb = new StringBuffer();
+ path = path.replaceFirst(repositoryPath, "");
+ if (path.startsWith("/")) {
+ path = path.replaceFirst("/", "");
+ }
+ String[] tmp = path.split("/", 3);
+ String scheme = "";
+ String host = "";
+ String other = "";
+ try {
+ scheme = tmp[0];
+ } catch (ArrayIndexOutOfBoundsException e) {
+ scheme = "hdfs";
+ }
+ try {
+ host = tmp[1];
+ } catch (ArrayIndexOutOfBoundsException e) {
+ host = "localhost:9000";
+ }
+ try {
+ other = "/" + tmp[2];
+ } catch (ArrayIndexOutOfBoundsException e) {
+ other = "/";
+ }
+ return new URI(scheme + "://" + host + other).toURL().toString();
+ }
+ }
+ return convertToURI(documentIdentifier);
+ }
+ catch (URISyntaxException e)
+ {
+ throw new ManifoldCFException("Bad url",e);
+ }
+ catch (IOException e)
+ {
+ throw new ManifoldCFException("Bad url",e);
+ }
+ }
+
+/** Convert a document identifier to a URI. The URI is the URI that will be the unique key from
+ * the search index, and will be presented to the user as part of the search results.
+ *@param documentIdentifier is the document identifier.
+ *@return the document uri.
+ */
+ protected String convertToURI(String documentIdentifier)
+ throws ManifoldCFException
+ {
+ //
+ // Note well: This MUST be a legal URI!!!
+ return new Path(documentIdentifier).toUri().toString();
+ }
+
+/** Map an extension to a mime type */
+ protected static String mapExtensionToMimeType(String fileName)
+ {
+ int slashIndex = fileName.lastIndexOf("/");
+ if (slashIndex != -1)
+ fileName = fileName.substring(slashIndex+1);
+ int dotIndex = fileName.lastIndexOf(".");
+ if (dotIndex == -1)
+ return null;
+ return ExtensionMimeMap.mapToMimeType(fileName.substring(dotIndex+1).toLowerCase(java.util.Locale.ROOT));
+ }
+
+/** Check if a file or directory should be included, given a document specification.
*@param fileName is the canonical file name.
*@param documentSpecification is the specification.
*@return true if it should be included.
@@ -1436,68 +1631,251 @@ public class HDFSRepositoryConnector ext
}
}
- /** Document identifier stream.
- */
- protected static class IdentifierStream implements IDocumentIdentifierStream
+ /**
+ * @param e
+ * @throws ManifoldCFException
+ * @throws ServiceInterruption
+ */
+ private static void handleIOException(IOException e) throws ManifoldCFException, ServiceInterruption {
+ if (!(e instanceof java.net.SocketTimeoutException) && (e instanceof InterruptedIOException)) {
+ throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+ }
+ long currentTime = System.currentTimeMillis();
+ throw new ServiceInterruption("IO exception: "+e.getMessage(), e, currentTime + 300000L, currentTime + 3 * 60 * 60000L,-1,false);
+ }
+
+ /**
+ * @param e
+ * @throws ManifoldCFException
+ * @throws ServiceInterruption
+ */
+ private static void handleGeneralSecurityException(GeneralSecurityException e) throws ManifoldCFException, ServiceInterruption {
+ // Permanent problem: can't initialize transport layer
+ throw new ManifoldCFException("HDFS exception: "+e.getMessage(), e);
+ }
+
+ protected class CheckConnectionThread extends Thread {
+ protected Throwable exception = null;
+
+ public CheckConnectionThread() {
+ super();
+ setDaemon(true);
+ }
+
+ public void run() {
+ try {
+ session.getRepositoryInfo();
+ } catch (Throwable e) {
+ this.exception = e;
+ }
+ }
+
+ public Throwable getException() {
+ return exception;
+ }
+ }
+
+ protected class GetSessionThread extends Thread {
+ protected Throwable exception = null;
+
+ public GetSessionThread() {
+ super();
+ setDaemon(true);
+ }
+
+ public void run() {
+ try {
+ // Create a session
+ session = new HDFSSession(nameNode, config, user);
+ } catch (Throwable e) {
+ this.exception = e;
+ }
+ }
+
+ public Throwable getException() {
+ return exception;
+ }
+ }
+
+ protected class GetSeedsThread extends Thread {
+ protected Throwable exception = null;
+ protected final String path;
+ protected final XThreadStringBuffer seedBuffer;
+
+ public GetSeedsThread(String path) {
+ super();
+ this.path = path;
+ this.seedBuffer = new XThreadStringBuffer();
+ setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+ try {
+ session.getSeeds(seedBuffer, path);
+ seedBuffer.signalDone();
+ } catch (Throwable e) {
+ this.exception = e;
+ }
+ }
+
+ public XThreadStringBuffer getBuffer() {
+ return seedBuffer;
+ }
+
+ public void finishUp() throws InterruptedException {
+ seedBuffer.abandon();
+ join();
+ Throwable thr = exception;
+ if (thr != null) {
+ if (thr instanceof RuntimeException) {
+ throw (RuntimeException) thr;
+ } else if (thr instanceof Error) {
+ throw (Error) thr;
+ } else {
+ throw new RuntimeException("Unhandled exception of type: "+thr.getClass().getName(),thr);
+ }
+ }
+ }
+ }
+
+ protected class GetObjectThread extends Thread {
+ protected final String nodeId;
+ protected Throwable exception = null;
+ protected Path response = null;
+
+ public GetObjectThread(String nodeId) {
+ super();
+ setDaemon(true);
+ this.nodeId = nodeId;
+ }
+
+ public void run() {
+ try {
+ response = session.getObject(nodeId);
+ } catch (Throwable e) {
+ this.exception = e;
+ }
+ }
+
+ public void finishUp() throws InterruptedException {
+ join();
+ Throwable thr = exception;
+ if (thr != null) {
+ if (thr instanceof RuntimeException) {
+ throw (RuntimeException) thr;
+ } else if (thr instanceof Error) {
+ throw (Error) thr;
+ } else {
+ throw new RuntimeException("Unhandled exception of type: "+thr.getClass().getName(),thr);
+ }
+ }
+ }
+
+ public Path getResponse() {
+ return response;
+ }
+
+ public Throwable getException() {
+ return exception;
+ }
+ }
+
+ protected class BackgroundStreamThread extends Thread
{
- protected String[] ids = null;
- protected int currentIndex = 0;
+ protected final String nodeId;
+
+ protected boolean abortThread = false;
+ protected Throwable responseException = null;
+ protected InputStream sourceStream = null;
+ protected XThreadInputStream threadStream = null;
+
+ public BackgroundStreamThread(String nodeId)
+ {
+ super();
+ setDaemon(true);
+ this.nodeId = nodeId;
+ }
- public IdentifierStream(DocumentSpecification spec)
- throws ManifoldCFException
+ public void run()
{
- // Walk the specification for the "startpoint" types. Amalgamate these into a list of strings.
- // Presume that all roots are startpoint nodes
- int i = 0;
- int j = 0;
- while (i < spec.getChildCount())
- {
- SpecificationNode n = spec.getChild(i);
- if (n.getType().equals("startpoint"))
- {
- j++;
+ try {
+ try {
+ synchronized (this) {
+ if (!abortThread) {
+ sourceStream = session.getFSDataInputStream(nodeId);
+ threadStream = new XThreadInputStream(sourceStream);
+ this.notifyAll();
+ }
+ }
+
+ if (threadStream != null)
+ {
+ // Stuff the content until we are done
+ threadStream.stuffQueue();
+ }
+ } finally {
+ if (sourceStream != null) {
+ sourceStream.close();
+ }
}
- i++;
+ } catch (Throwable e) {
+ responseException = e;
}
- ids = new String[j];
- i = 0;
- j = 0;
- while (i < ids.length)
+ }
+
+ public InputStream getSafeInputStream() throws InterruptedException, IOException
+ {
+ // Must wait until stream is created, or until we note an exception was thrown.
+ while (true)
{
- SpecificationNode n = spec.getChild(i);
- if (n.getType().equals("startpoint"))
+ synchronized (this)
{
- // The id returned MUST be in canonical form!!!
- ids[j] = new Path(n.getAttributeValue("path")).toString();
- if (Logging.connectors.isDebugEnabled())
- {
- Logging.connectors.debug("Seed = '"+ids[j]+"'");
+ if (responseException != null) {
+ throw new IllegalStateException("Check for response before getting stream");
}
- j++;
+ checkException(responseException);
+ if (threadStream != null) {
+ return threadStream;
+ }
+ wait();
}
- i++;
}
}
+
+ public void finishUp() throws InterruptedException, IOException
+ {
+ // This will be called during the finally
+ // block in the case where all is well (and
+ // the stream completed) and in the case where
+ // there were exceptions.
+ synchronized (this) {
+ if (threadStream != null) {
+ threadStream.abort();
+ }
+ abortThread = true;
+ }
+
+ join();
- /** Get the next identifier.
- *@return the next document identifier, or null if there are no more.
- */
- public String getNextIdentifier()
- throws ManifoldCFException, ServiceInterruption
- {
- if (currentIndex == ids.length)
- return null;
- return ids[currentIndex++];
- }
-
- /** Close the stream.
- */
- public void close()
- throws ManifoldCFException
+ checkException(responseException);
+ }
+
+ protected synchronized void checkException(Throwable exception) throws IOException
{
- ids = null;
+ if (exception != null)
+ {
+ Throwable e = exception;
+ if (e instanceof IOException) {
+ throw (IOException)e;
+ } else if (e instanceof RuntimeException) {
+ throw (RuntimeException)e;
+ } else if (e instanceof Error) {
+ throw (Error)e;
+ } else {
+ throw new RuntimeException("Unhandled exception of type: "+e.getClass().getName(),e);
+ }
+ }
}
-
}
-
}
Added: manifoldcf/branches/CONNECTORS-728/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSSession.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-728/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSSession.java?rev=1496377&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-728/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSSession.java (added)
+++ manifoldcf/branches/CONNECTORS-728/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSSession.java Tue Jun 25 08:23:03 2013
@@ -0,0 +1,108 @@
+/* $Id: DropboxSession.java 1490621 2013-06-07 12:55:04Z kwright $ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.apache.manifoldcf.crawler.connectors.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.manifoldcf.core.common.*;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+import java.util.HashMap;
+
+/**
+ *
+ * @author andrew
+ */
+public class HDFSSession {
+
+ private FileSystem fileSystem;
+ private String nameNode;
+ private Configuration config;
+ private String user;
+
+ public HDFSSession(String nameNode, Configuration config, String user) throws URISyntaxException, IOException, InterruptedException {
+ this.nameNode = nameNode;
+ this.config = config;
+ this.user = user;
+ fileSystem = FileSystem.get(new URI(nameNode), config, user);
+ }
+
+ public Map<String, String> getRepositoryInfo() {
+ Map<String, String> info = new HashMap<String, String>();
+
+ info.put("Name Node", nameNode);
+ info.put("Config", config.toString());
+ info.put("User", user);
+ info.put("Canonical Service Name", fileSystem.getCanonicalServiceName());
+ info.put("Default Block Size", Long.toString(fileSystem.getDefaultBlockSize()));
+ info.put("Default Replication", Short.toString(fileSystem.getDefaultReplication()));
+ info.put("Home Directory", fileSystem.getHomeDirectory().toUri().toString());
+ info.put("Working Directory", fileSystem.getWorkingDirectory().toUri().toString());
+ return info;
+ }
+
+ public void getSeeds(XThreadStringBuffer idBuffer, String path)
+ throws IOException, InterruptedException {
+
+ /*
+ * need to add root dir so that single files such as /file1 will still get read
+ */
+ idBuffer.add(path);
+
+ /*
+ * gets a list of the contents of the entire folder: subfolders + files
+ */
+ FileStatus[] fileStatuses = fileSystem.listStatus(new Path(path));
+ for (FileStatus fileStatus : fileStatuses) {
+ /*
+ * only add the directories as seeds, we'll add the files later
+ */
+ if (fileStatus.isDir()) {
+ idBuffer.add(fileStatus.getPath().toUri().toString());
+ }
+ }
+ }
+
+ public FileSystem getFileSystem() {
+ return fileSystem;
+ }
+
+ public Path getObject(String id) {
+ return new Path(id);
+ }
+
+ public FSDataInputStream getFSDataInputStream(String id) throws IOException {
+ return fileSystem.open(new Path(id));
+ }
+
+ public void close() throws IOException {
+ fileSystem.close();
+ }
+}
Propchange: manifoldcf/branches/CONNECTORS-728/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSSession.java
------------------------------------------------------------------------------
svn:mime-type = text/plain