You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@marmotta.apache.org by wi...@apache.org on 2013/08/27 16:17:53 UTC
[2/3] git commit: MARMOTTA-291: added support to import into named
contexts using the relative path to the import folder
MARMOTTA-291: added support to import into named contexts using the relative path to the import folder
Project: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/commit/d9c4190e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/tree/d9c4190e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/diff/d9c4190e
Branch: refs/heads/develop
Commit: d9c4190ed978d9e75bcd5bc000b3994df2cdb91f
Parents: bac8dee
Author: Sergio Fernández <wi...@apache.org>
Authored: Tue Aug 27 16:13:49 2013 +0200
Committer: Sergio Fernández <wi...@apache.org>
Committed: Tue Aug 27 16:13:49 2013 +0200
----------------------------------------------------------------------
.../core/api/importer/ImportWatchService.java | 8 +-
.../importer/ImportWatchServiceImpl.java | 107 ++++++++++++++-----
2 files changed, 84 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/d9c4190e/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/api/importer/ImportWatchService.java
----------------------------------------------------------------------
diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/api/importer/ImportWatchService.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/api/importer/ImportWatchService.java
index 1906550..6472276 100644
--- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/api/importer/ImportWatchService.java
+++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/api/importer/ImportWatchService.java
@@ -17,9 +17,10 @@
*/
package org.apache.marmotta.platform.core.api.importer;
-import java.nio.file.Path;
+import java.io.File;
import org.apache.marmotta.platform.core.events.SystemStartupEvent;
+import org.openrdf.model.URI;
/**
* A service for watching import directory
@@ -38,9 +39,10 @@ public interface ImportWatchService {
/**
* Import an observed item
*
- * @param item
+ * @param file
+ * @param context
* @return
*/
- boolean execImport(Path item);
+ boolean execImport(File file, URI context);
}
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/d9c4190e/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
----------------------------------------------------------------------
diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
index e92bf1e..0691563 100644
--- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
+++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
@@ -18,20 +18,25 @@
package org.apache.marmotta.platform.core.services.importer;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
+import java.nio.file.WatchEvent.Kind;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
+import org.apache.commons.lang3.StringUtils;
import org.apache.marmotta.platform.core.api.config.ConfigurationService;
import org.apache.marmotta.platform.core.api.importer.ImportService;
import org.apache.marmotta.platform.core.api.importer.ImportWatchService;
@@ -41,6 +46,7 @@ import org.apache.marmotta.platform.core.api.triplestore.ContextService;
import org.apache.marmotta.platform.core.api.user.UserService;
import org.apache.marmotta.platform.core.events.SystemStartupEvent;
import org.apache.marmotta.platform.core.exception.io.MarmottaImportException;
+import org.openrdf.model.URI;
import org.openrdf.rio.Rio;
import org.slf4j.Logger;
@@ -72,19 +78,21 @@ public class ImportWatchServiceImpl implements ImportWatchService {
@Inject
private UserService userService;
+
+ private Map<WatchKey,Path> keys;
private String path;
private int count;
public ImportWatchServiceImpl() {
+ this.keys = new HashMap<WatchKey,Path>();
count = 0;
}
@Override
public void initialize(@Observes SystemStartupEvent event) {
- final String import_watch_path = configurationService.getHome() + File.separator + ConfigurationService.DIR_IMPORT;
- this.path = import_watch_path;
+ this.path = configurationService.getHome() + File.separator + ConfigurationService.DIR_IMPORT;
Runnable r = new Runnable() {
@@ -92,31 +100,50 @@ public class ImportWatchServiceImpl implements ImportWatchService {
public void run() {
final Task task = taskManagerService.createTask("Directory import watch", TASK_GROUP);
task.updateMessage("watching...");
- task.updateDetailMessage("path", import_watch_path);
+ task.updateDetailMessage("path", path);
try {
- Path dir = Paths.get(path);
- WatchService watchService = dir.getFileSystem().newWatchService();
- dir.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);
+ Path root = Paths.get(path);
+ WatchService watcher = root.getFileSystem().newWatchService();
+ register(root, watcher, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
while (true) {
- final WatchKey key = watchService.take();
+ final WatchKey key = watcher.take();
for (WatchEvent<?> event : key.pollEvents()) {
- if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) { // TODO: is it necessary?
- @SuppressWarnings("unchecked")
- Path item = dir.resolve(((WatchEvent<Path>) event).context());
- if (!Files.isDirectory(item)) {
- log.debug("Importing '{}'...", item.toString());
+
+ @SuppressWarnings("unchecked")
+ Path item = ((WatchEvent<Path>) event).context();
+ Path dir = keys.get(key);
+ File file = new File(dir.toString(), item.toString()).getAbsoluteFile();
+
+ if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) {
+ if (file.isDirectory()) {
+ //recursive registration of sub-directories
+ register(Paths.get(path, item.toString()), watcher, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
+ task.updateProgress(++count);
+ } else {
+ log.debug("Importing '{}'...", file.getAbsolutePath());
task.updateMessage("importing...");
- task.updateDetailMessage("path", item.toString());
- if (execImport(item)) {
- log.info("Sucessfully imported file '{}'!", item.toString());
- Files.delete(item);
+ task.updateDetailMessage("path", file.getAbsolutePath());
+ URI context = getTargetContext(file);
+ if (execImport(file, context)) {
+ log.info("Sucessfully imported file '{}' into {}", file.getAbsolutePath(), context.stringValue());
+ try {
+ //delete the imported file
+ log.debug("Deleting {}...", file.getAbsolutePath());
+ file.delete();
+ } catch (Exception ex) {
+ log.error("Error deleing {}: {}", file.getAbsolutePath(), ex.getMessage());
+ }
}
task.updateProgress(++count);
task.updateMessage("watching...");
- task.updateDetailMessage("path", import_watch_path);
+ task.updateDetailMessage("path", path);
}
+ } else if (StandardWatchEventKinds.ENTRY_DELETE.equals(event.kind()) && Files.isDirectory(item)) {
+ //TODO: unregister deleted directories?
+ task.updateProgress(++count);
}
+
}
if (!key.reset()) {
// exit loop if the key is not valid
@@ -125,13 +152,12 @@ public class ImportWatchServiceImpl implements ImportWatchService {
}
}
} catch (IOException e) {
- log.error(
- "Error registering the import watch service over '{}': {}",
- import_watch_path, e.getMessage());
+ log.error("Error registering the import watch service over '{}': {}", path, e.getMessage());
} catch (InterruptedException e) {
log.error("Import watch service has been interrupted");
}
}
+
};
Thread t = new Thread(r);
@@ -142,21 +168,46 @@ public class ImportWatchServiceImpl implements ImportWatchService {
}
@Override
- public boolean execImport(Path item) {
+ public boolean execImport(File file, URI context) {
try {
- importService.importData(Files.newInputStream(item), Rio
- .getParserFormatForFileName(item.toString())
- .getDefaultMIMEType(), userService.getAdminUser(),
- contextService.getDefaultContext());
+ importService.importData(new FileInputStream(file),
+ Rio.getParserFormatForFileName(file.getName()).getDefaultMIMEType(),
+ userService.getAdminUser(), context);
return true;
} catch (MarmottaImportException e) {
- log.error("Error importing file {} from the local directory: {}", item.toString(), e.getMessage());
+ log.error("Error importing file {} from the local directory: {}", file.getAbsolutePath(), e.getMessage());
return false;
} catch (IOException e) {
- e.printStackTrace();
- log.error("Error retrieving file {} from the local directory: {}", item.toString(), e.getMessage());
+ log.error("Error retrieving file {} from the local directory: {}", file.getAbsolutePath(), e.getMessage());
return false;
}
}
+
+ /**
+ * Get the target context, according the path relative to the base import directory
+ *
+ * @param file
+ * @return
+ */
+ private URI getTargetContext(File file) {
+ String subdir = StringUtils.removeStart(file.getParentFile().getAbsolutePath(), this.path);
+ if (StringUtils.isBlank(subdir)) {
+ return contextService.getDefaultContext();
+ } else {
+ return contextService.createContext(configurationService.getBaseContext() + subdir.substring(1));
+ }
+ }
+
+ /**
+ * Registers a new path in the watcher, keeping the path mapping for future uses
+ *
+ * @param path
+ * @param watcher
+ * @param events
+ * @throws IOException
+ */
+ private void register(Path path, WatchService watcher, Kind<?>... events) throws IOException {
+ keys.put(path.register(watcher, events), path);
+ }
}