You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@marmotta.apache.org by wi...@apache.org on 2013/08/27 09:22:36 UTC
[2/3] git commit: MARMOTTA-145: integrated import watch service with
the task manager
MARMOTTA-145: integrated import watch service with the task manager
Project: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/commit/dc2b5acc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/tree/dc2b5acc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/diff/dc2b5acc
Branch: refs/heads/develop
Commit: dc2b5accaf6dcb10745f0b0501be3c9d19a9a79e
Parents: 091e48c
Author: Sergio Fernández <wi...@apache.org>
Authored: Tue Aug 27 08:49:58 2013 +0200
Committer: Sergio Fernández <wi...@apache.org>
Committed: Tue Aug 27 08:49:58 2013 +0200
----------------------------------------------------------------------
.../importer/ImportWatchServiceImpl.java | 84 +++++++++++++-------
.../core/services/io/MarmottaIOServiceImpl.java | 5 +-
.../platform/core/services/task/TaskImpl.java | 2 +-
.../core/webservices/io/ImportWebService.java | 2 +
4 files changed, 61 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/dc2b5acc/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
----------------------------------------------------------------------
diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
index b088902..ed50ffc 100644
--- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
+++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
@@ -26,6 +26,7 @@ import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
+import java.util.Date;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
@@ -34,6 +35,8 @@ import javax.inject.Inject;
import org.apache.marmotta.platform.core.api.config.ConfigurationService;
import org.apache.marmotta.platform.core.api.importer.ImportService;
import org.apache.marmotta.platform.core.api.importer.ImportWatchService;
+import org.apache.marmotta.platform.core.api.task.Task;
+import org.apache.marmotta.platform.core.api.task.TaskManagerService;
import org.apache.marmotta.platform.core.api.triplestore.ContextService;
import org.apache.marmotta.platform.core.api.user.UserService;
import org.apache.marmotta.platform.core.events.SystemStartupEvent;
@@ -50,10 +53,15 @@ import org.slf4j.Logger;
@ApplicationScoped
public class ImportWatchServiceImpl implements ImportWatchService {
+ private static final String TASK_GROUP = "ImportWatch";
+
@Inject
private Logger log;
@Inject
+ private TaskManagerService taskManagerService;
+
+ @Inject
private ConfigurationService configurationService;
@Inject
@@ -73,33 +81,55 @@ public class ImportWatchServiceImpl implements ImportWatchService {
@Override
public void initialize(@Observes SystemStartupEvent event) {
- this.path = configurationService.getHome() + File.separator + ConfigurationService.DIR_IMPORT;
- try {
- Path path = Paths.get(this.path);
- WatchService watchService = path.getFileSystem().newWatchService();
- path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);
- while (true) {
- final WatchKey key = watchService.take();
- for (WatchEvent<?> watchEvent : key.pollEvents()) {
- if (StandardWatchEventKinds.ENTRY_CREATE.equals(watchEvent.kind())) { //TODO: is it necessary?
- @SuppressWarnings("unchecked") final Path item = ((WatchEvent<Path>) watchEvent).context();
- if (execImport(item)) {
- log.info("Sucessfully imported file '{}'!", item.toString());
- Files.delete(item);
- }
- }
- }
- if (!key.reset()) {
- // exit loop if the key is not valid
- // e.g. if the directory was deleted
- break;
- }
- }
- } catch (IOException e) {
- log.error("Error registering the import watch service over '{}': {}", this.path, e.getMessage());
- } catch (InterruptedException e) {
- log.error("Import watch service has been interrupted");
- }
+ final String import_watch_path = configurationService.getHome() + File.separator + ConfigurationService.DIR_IMPORT;
+ this.path = import_watch_path;
+
+ Runnable r = new Runnable() {
+
+ @Override
+ public void run() {
+ final Task task = taskManagerService.createTask("Directory import watch", TASK_GROUP);
+ task.updateMessage("watching...");
+ task.updateDetailMessage("path", import_watch_path);
+
+ try {
+ Path path = Paths.get(import_watch_path);
+ WatchService watchService = path.getFileSystem().newWatchService();
+ path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);
+ while (true) {
+ final WatchKey key = watchService.take();
+ for (WatchEvent<?> watchEvent : key.pollEvents()) {
+ if (StandardWatchEventKinds.ENTRY_CREATE.equals(watchEvent.kind())) { //TODO: is it necessary?
+ @SuppressWarnings("unchecked") final Path item = ((WatchEvent<Path>) watchEvent).context();
+ task.updateMessage("importing...");
+ task.updateDetailMessage("path", item.toString());
+ if (execImport(item)) {
+ log.info("Sucessfully imported file '{}'!", item.toString());
+ Files.delete(item);
+ }
+ task.updateMessage("watching...");
+ task.updateDetailMessage("path", import_watch_path);
+ }
+ }
+ if (!key.reset()) {
+ // exit loop if the key is not valid
+ // e.g. if the directory was deleted
+ break;
+ }
+ }
+ } catch (IOException e) {
+ log.error("Error registering the import watch service over '{}': {}", import_watch_path, e.getMessage());
+ } catch (InterruptedException e) {
+ log.error("Import watch service has been interrupted");
+ }
+ }
+ };
+
+ Thread t = new Thread(r);
+ t.setName(TASK_GROUP+"(start:" + new Date() + ",path:" + this.path + ")");
+ t.setDaemon(true);
+ t.start();
+
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/dc2b5acc/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/io/MarmottaIOServiceImpl.java
----------------------------------------------------------------------
diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/io/MarmottaIOServiceImpl.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/io/MarmottaIOServiceImpl.java
index f96de33..3c8d9b6 100644
--- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/io/MarmottaIOServiceImpl.java
+++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/io/MarmottaIOServiceImpl.java
@@ -65,12 +65,8 @@ public class MarmottaIOServiceImpl implements MarmottaIOService {
producedTypes.addAll(format.getMIMETypes());
}
log.info(" - available writers: {}", Arrays.toString(producedTypes.toArray()));
-
-
-
}
-
/**
* returns a list of all mimetypes which can be parsed by implemented parsers
* @return
@@ -108,4 +104,5 @@ public class MarmottaIOServiceImpl implements MarmottaIOService {
public RDFFormat getParser(String mimetype) {
return parserRegistry.getFileFormatForMIMEType(mimetype);
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/dc2b5acc/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/task/TaskImpl.java
----------------------------------------------------------------------
diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/task/TaskImpl.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/task/TaskImpl.java
index 56db715..f0ce2b5 100644
--- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/task/TaskImpl.java
+++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/task/TaskImpl.java
@@ -20,7 +20,6 @@ package org.apache.marmotta.platform.core.services.task;
import org.apache.marmotta.platform.core.api.task.Task;
import org.apache.marmotta.platform.core.api.task.TaskManagerService;
-
class TaskImpl extends Task {
private static final String WAITING_DETAIL = "Waiting in status";
@@ -49,4 +48,5 @@ class TaskImpl extends Task {
public void subTaskEnded() {
updateMessage(detailMessages.remove(WAITING_DETAIL));
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/dc2b5acc/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/webservices/io/ImportWebService.java
----------------------------------------------------------------------
diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/webservices/io/ImportWebService.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/webservices/io/ImportWebService.java
index 26ca893..9b0b36d 100644
--- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/webservices/io/ImportWebService.java
+++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/webservices/io/ImportWebService.java
@@ -272,6 +272,7 @@ public class ImportWebService {
}
protected static class Status {
+
boolean isRunning;
String status;
String message;
@@ -307,6 +308,7 @@ public class ImportWebService {
public void setStatus(String status) {
this.status = status;
}
+
}
}