You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@marmotta.apache.org by wi...@apache.org on 2013/08/27 16:17:53 UTC

[2/3] git commit: MARMOTTA-291: added support to import into named contexts using the relative path to the import folder

MARMOTTA-291: added support to import into named contexts using the relative path to the import folder


Project: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/commit/d9c4190e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/tree/d9c4190e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/diff/d9c4190e

Branch: refs/heads/develop
Commit: d9c4190ed978d9e75bcd5bc000b3994df2cdb91f
Parents: bac8dee
Author: Sergio Fernández <wi...@apache.org>
Authored: Tue Aug 27 16:13:49 2013 +0200
Committer: Sergio Fernández <wi...@apache.org>
Committed: Tue Aug 27 16:13:49 2013 +0200

----------------------------------------------------------------------
 .../core/api/importer/ImportWatchService.java   |   8 +-
 .../importer/ImportWatchServiceImpl.java        | 107 ++++++++++++++-----
 2 files changed, 84 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/d9c4190e/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/api/importer/ImportWatchService.java
----------------------------------------------------------------------
diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/api/importer/ImportWatchService.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/api/importer/ImportWatchService.java
index 1906550..6472276 100644
--- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/api/importer/ImportWatchService.java
+++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/api/importer/ImportWatchService.java
@@ -17,9 +17,10 @@
  */
 package org.apache.marmotta.platform.core.api.importer;
 
-import java.nio.file.Path;
+import java.io.File;
 
 import org.apache.marmotta.platform.core.events.SystemStartupEvent;
+import org.openrdf.model.URI;
 
 /**
  * A service for watching import directory
@@ -38,9 +39,10 @@ public interface ImportWatchService {
 	/**
 	 * Import an observed item 
 	 * 
-	 * @param item
+	 * @param file
+	 * @param context
 	 * @return
 	 */
-	boolean execImport(Path item);
+	boolean execImport(File file, URI context);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/d9c4190e/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
----------------------------------------------------------------------
diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
index e92bf1e..0691563 100644
--- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
+++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
@@ -18,20 +18,25 @@
 package org.apache.marmotta.platform.core.services.importer;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.StandardWatchEventKinds;
 import java.nio.file.WatchEvent;
+import java.nio.file.WatchEvent.Kind;
 import java.nio.file.WatchKey;
 import java.nio.file.WatchService;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
 
 import javax.enterprise.context.ApplicationScoped;
 import javax.enterprise.event.Observes;
 import javax.inject.Inject;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.marmotta.platform.core.api.config.ConfigurationService;
 import org.apache.marmotta.platform.core.api.importer.ImportService;
 import org.apache.marmotta.platform.core.api.importer.ImportWatchService;
@@ -41,6 +46,7 @@ import org.apache.marmotta.platform.core.api.triplestore.ContextService;
 import org.apache.marmotta.platform.core.api.user.UserService;
 import org.apache.marmotta.platform.core.events.SystemStartupEvent;
 import org.apache.marmotta.platform.core.exception.io.MarmottaImportException;
+import org.openrdf.model.URI;
 import org.openrdf.rio.Rio;
 import org.slf4j.Logger;
 
@@ -72,19 +78,21 @@ public class ImportWatchServiceImpl implements ImportWatchService {
 
 	@Inject
 	private UserService userService;
+	
+	private Map<WatchKey,Path> keys;
 
 	private String path;
 
 	private int count;
 
 	public ImportWatchServiceImpl() {
+		this.keys = new HashMap<WatchKey,Path>();
 		count = 0;
 	}
 
 	@Override
 	public void initialize(@Observes SystemStartupEvent event) {
-		final String import_watch_path = configurationService.getHome() + File.separator + ConfigurationService.DIR_IMPORT;
-		this.path = import_watch_path;
+		this.path = configurationService.getHome() + File.separator + ConfigurationService.DIR_IMPORT;
 
 		Runnable r = new Runnable() {
 
@@ -92,31 +100,50 @@ public class ImportWatchServiceImpl implements ImportWatchService {
 			public void run() {
 				final Task task = taskManagerService.createTask("Directory import watch", TASK_GROUP);
 				task.updateMessage("watching...");
-				task.updateDetailMessage("path", import_watch_path);
+				task.updateDetailMessage("path", path);
 
 				try {
-					Path dir = Paths.get(path);
-					WatchService watchService = dir.getFileSystem().newWatchService();
-					dir.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);
+					Path root = Paths.get(path);
+					WatchService watcher = root.getFileSystem().newWatchService();
+					register(root, watcher, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
 					while (true) {
-						final WatchKey key = watchService.take();
+						final WatchKey key = watcher.take();
 						for (WatchEvent<?> event : key.pollEvents()) {
-							if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) { // TODO: is it necessary?
-								@SuppressWarnings("unchecked")
-								Path item = dir.resolve(((WatchEvent<Path>) event).context());
-								if (!Files.isDirectory(item)) {
-									log.debug("Importing '{}'...", item.toString());
+							
+							@SuppressWarnings("unchecked")
+							Path item = ((WatchEvent<Path>) event).context();
+							Path dir = keys.get(key);
+							File file = new File(dir.toString(), item.toString()).getAbsoluteFile();
+							
+							if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) {
+								if (file.isDirectory()) {
+									//recursive registration of sub-directories
+									register(Paths.get(path, item.toString()), watcher, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
+									task.updateProgress(++count);
+								} else {
+									log.debug("Importing '{}'...", file.getAbsolutePath());
 									task.updateMessage("importing...");
-									task.updateDetailMessage("path", item.toString());
-									if (execImport(item)) {
-										log.info("Sucessfully imported file '{}'!", item.toString());
-										Files.delete(item);
+									task.updateDetailMessage("path", file.getAbsolutePath());
+									URI context = getTargetContext(file);
+									if (execImport(file, context)) {
+										log.info("Sucessfully imported file '{}' into {}", file.getAbsolutePath(), context.stringValue());
+										try {
+											//delete the imported file
+											log.debug("Deleting {}...", file.getAbsolutePath());
+											file.delete();
+										} catch (Exception ex) {
+											log.error("Error deleing {}: {}", file.getAbsolutePath(), ex.getMessage());
+										}
 									}
 									task.updateProgress(++count);
 									task.updateMessage("watching...");
-									task.updateDetailMessage("path", import_watch_path);
+									task.updateDetailMessage("path", path);
 								}
+							} else if (StandardWatchEventKinds.ENTRY_DELETE.equals(event.kind()) && Files.isDirectory(item)) {
+								//TODO: unregister deleted directories?
+								task.updateProgress(++count);
 							}
+							
 						}
 						if (!key.reset()) {
 							// exit loop if the key is not valid
@@ -125,13 +152,12 @@ public class ImportWatchServiceImpl implements ImportWatchService {
 						}
 					}
 				} catch (IOException e) {
-					log.error(
-							"Error registering the import watch service over '{}': {}",
-							import_watch_path, e.getMessage());
+					log.error("Error registering the import watch service over '{}': {}", path, e.getMessage());
 				} catch (InterruptedException e) {
 					log.error("Import watch service has been interrupted");
 				}
 			}
+
 		};
 
 		Thread t = new Thread(r);
@@ -142,21 +168,46 @@ public class ImportWatchServiceImpl implements ImportWatchService {
 	}
 
 	@Override
-	public boolean execImport(Path item) {
+	public boolean execImport(File file, URI context) {
 		try {
-			importService.importData(Files.newInputStream(item), Rio
-					.getParserFormatForFileName(item.toString())
-					.getDefaultMIMEType(), userService.getAdminUser(),
-					contextService.getDefaultContext());
+			importService.importData(new FileInputStream(file),
+					Rio.getParserFormatForFileName(file.getName()).getDefaultMIMEType(), 
+					userService.getAdminUser(), context);
 			return true;
 		} catch (MarmottaImportException e) {
-			log.error("Error importing file {} from the local directory: {}", item.toString(), e.getMessage());
+			log.error("Error importing file {} from the local directory: {}", file.getAbsolutePath(), e.getMessage());
 			return false;
 		} catch (IOException e) {
-			e.printStackTrace();
-			log.error("Error retrieving file {} from the local directory: {}", item.toString(), e.getMessage());
+			log.error("Error retrieving file {} from the local directory: {}", file.getAbsolutePath(), e.getMessage());
 			return false;
 		}
 	}
+	
+	/**
+	 * Get the target context, according the path relative to the base import directory
+	 * 
+	 * @param file
+	 * @return
+	 */
+	private URI getTargetContext(File file) {
+		String subdir = StringUtils.removeStart(file.getParentFile().getAbsolutePath(), this.path);
+		if (StringUtils.isBlank(subdir)) {
+			return contextService.getDefaultContext();
+		} else {
+			return contextService.createContext(configurationService.getBaseContext() + subdir.substring(1));
+		}
+	}
+	
+	/**
+	 * Registers a new path in the watcher, keeping the path mapping for future uses
+	 * 
+	 * @param path
+	 * @param watcher
+	 * @param events
+	 * @throws IOException
+	 */
+	private void register(Path path, WatchService watcher, Kind<?>... events) throws IOException {
+		keys.put(path.register(watcher, events), path);
+	}
 
 }