You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@marmotta.apache.org by wi...@apache.org on 2013/08/27 09:22:36 UTC

[2/3] git commit: MARMOTTA-145: integrated import watch service with the task manager

MARMOTTA-145: integrated import watch service with the task manager


Project: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/commit/dc2b5acc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/tree/dc2b5acc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/diff/dc2b5acc

Branch: refs/heads/develop
Commit: dc2b5accaf6dcb10745f0b0501be3c9d19a9a79e
Parents: 091e48c
Author: Sergio Fernández <wi...@apache.org>
Authored: Tue Aug 27 08:49:58 2013 +0200
Committer: Sergio Fernández <wi...@apache.org>
Committed: Tue Aug 27 08:49:58 2013 +0200

----------------------------------------------------------------------
 .../importer/ImportWatchServiceImpl.java        | 84 +++++++++++++-------
 .../core/services/io/MarmottaIOServiceImpl.java |  5 +-
 .../platform/core/services/task/TaskImpl.java   |  2 +-
 .../core/webservices/io/ImportWebService.java   |  2 +
 4 files changed, 61 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/dc2b5acc/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
----------------------------------------------------------------------
diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
index b088902..ed50ffc 100644
--- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
+++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
@@ -26,6 +26,7 @@ import java.nio.file.StandardWatchEventKinds;
 import java.nio.file.WatchEvent;
 import java.nio.file.WatchKey;
 import java.nio.file.WatchService;
+import java.util.Date;
 
 import javax.enterprise.context.ApplicationScoped;
 import javax.enterprise.event.Observes;
@@ -34,6 +35,8 @@ import javax.inject.Inject;
 import org.apache.marmotta.platform.core.api.config.ConfigurationService;
 import org.apache.marmotta.platform.core.api.importer.ImportService;
 import org.apache.marmotta.platform.core.api.importer.ImportWatchService;
+import org.apache.marmotta.platform.core.api.task.Task;
+import org.apache.marmotta.platform.core.api.task.TaskManagerService;
 import org.apache.marmotta.platform.core.api.triplestore.ContextService;
 import org.apache.marmotta.platform.core.api.user.UserService;
 import org.apache.marmotta.platform.core.events.SystemStartupEvent;
@@ -50,10 +53,15 @@ import org.slf4j.Logger;
 @ApplicationScoped
 public class ImportWatchServiceImpl implements ImportWatchService {
 	
+	private static final String TASK_GROUP = "ImportWatch";
+	
 	@Inject
     private Logger log;
 	
 	@Inject
+	private TaskManagerService taskManagerService;
+	
+	@Inject
     private ConfigurationService configurationService;
 	
 	@Inject
@@ -73,33 +81,55 @@ public class ImportWatchServiceImpl implements ImportWatchService {
 	
 	@Override
     public void initialize(@Observes SystemStartupEvent event) {
-		this.path = configurationService.getHome() + File.separator + ConfigurationService.DIR_IMPORT;
-        try {
-        	Path path = Paths.get(this.path);
-        	WatchService watchService = path.getFileSystem().newWatchService();
-			path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);
-			while (true) {
-				final WatchKey key = watchService.take();
-				for (WatchEvent<?> watchEvent : key.pollEvents()) {
-					if (StandardWatchEventKinds.ENTRY_CREATE.equals(watchEvent.kind())) { //TODO: is it necessary?
-						@SuppressWarnings("unchecked") final Path item = ((WatchEvent<Path>) watchEvent).context();
-						if (execImport(item)) {
-							log.info("Sucessfully imported file '{}'!", item.toString());
-							Files.delete(item);
-						}
-					}
-				}
-				if (!key.reset()) {
-					// exit loop if the key is not valid
-					// e.g. if the directory was deleted
-					break;
-				}
-			}
-		} catch (IOException e) {
-			log.error("Error registering the import watch service over '{}': {}", this.path, e.getMessage());
-		} catch (InterruptedException e) {
-			log.error("Import watch service has been interrupted");
-		}
+		final String import_watch_path =  configurationService.getHome() + File.separator + ConfigurationService.DIR_IMPORT;
+		this.path = import_watch_path;
+        	
+        Runnable r = new Runnable() {
+
+            @Override
+            public void run() {
+                final Task task = taskManagerService.createTask("Directory import watch", TASK_GROUP);
+                task.updateMessage("watching...");
+                task.updateDetailMessage("path", import_watch_path);
+                
+                try {
+	                Path path = Paths.get(import_watch_path);
+	            	WatchService watchService = path.getFileSystem().newWatchService();
+	    			path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);
+	    			while (true) {
+	    				final WatchKey key = watchService.take();
+	    				for (WatchEvent<?> watchEvent : key.pollEvents()) {
+	    					if (StandardWatchEventKinds.ENTRY_CREATE.equals(watchEvent.kind())) { //TODO: is it necessary?
+	    						@SuppressWarnings("unchecked") final Path item = ((WatchEvent<Path>) watchEvent).context();
+	    	                    task.updateMessage("importing...");
+	    	                    task.updateDetailMessage("path", item.toString());
+	    						if (execImport(item)) {
+	    							log.info("Sucessfully imported file '{}'!", item.toString());
+	    							Files.delete(item);
+	    						}
+	    	                    task.updateMessage("watching...");
+	    	                    task.updateDetailMessage("path", import_watch_path);
+	    					}
+	    				}
+	    				if (!key.reset()) {
+	    					// exit loop if the key is not valid
+	    					// e.g. if the directory was deleted
+	    					break;
+	    				}
+	    			}
+        		} catch (IOException e) {
+        			log.error("Error registering the import watch service over '{}': {}", import_watch_path, e.getMessage());
+        		} catch (InterruptedException e) {
+        			log.error("Import watch service has been interrupted");
+        		}
+            }
+        };
+
+        Thread t = new Thread(r);
+        t.setName(TASK_GROUP+"(start:" + new Date() + ",path:" + this.path + ")");
+        t.setDaemon(true);
+        t.start();
+        
     }
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/dc2b5acc/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/io/MarmottaIOServiceImpl.java
----------------------------------------------------------------------
diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/io/MarmottaIOServiceImpl.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/io/MarmottaIOServiceImpl.java
index f96de33..3c8d9b6 100644
--- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/io/MarmottaIOServiceImpl.java
+++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/io/MarmottaIOServiceImpl.java
@@ -65,12 +65,8 @@ public class MarmottaIOServiceImpl implements MarmottaIOService {
             producedTypes.addAll(format.getMIMETypes());
         }
         log.info(" - available writers: {}", Arrays.toString(producedTypes.toArray()));
-
-
-
     }
 
-
 	/**
 	 * returns a list of all mimetypes which can be parsed by implemented parsers
 	 * @return
@@ -108,4 +104,5 @@ public class MarmottaIOServiceImpl implements MarmottaIOService {
 	public RDFFormat getParser(String mimetype) {
 		return parserRegistry.getFileFormatForMIMEType(mimetype);
 	}
+	
 }

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/dc2b5acc/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/task/TaskImpl.java
----------------------------------------------------------------------
diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/task/TaskImpl.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/task/TaskImpl.java
index 56db715..f0ce2b5 100644
--- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/task/TaskImpl.java
+++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/task/TaskImpl.java
@@ -20,7 +20,6 @@ package org.apache.marmotta.platform.core.services.task;
 import org.apache.marmotta.platform.core.api.task.Task;
 import org.apache.marmotta.platform.core.api.task.TaskManagerService;
 
-
 class TaskImpl extends Task {
 
     private static final String WAITING_DETAIL = "Waiting in status";
@@ -49,4 +48,5 @@ class TaskImpl extends Task {
     public void subTaskEnded() {
         updateMessage(detailMessages.remove(WAITING_DETAIL));
     }
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/dc2b5acc/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/webservices/io/ImportWebService.java
----------------------------------------------------------------------
diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/webservices/io/ImportWebService.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/webservices/io/ImportWebService.java
index 26ca893..9b0b36d 100644
--- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/webservices/io/ImportWebService.java
+++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/webservices/io/ImportWebService.java
@@ -272,6 +272,7 @@ public class ImportWebService {
     }
 
     protected static class Status {
+    	
         boolean isRunning;
         String status;
         String message;
@@ -307,6 +308,7 @@ public class ImportWebService {
         public void setStatus(String status) {
             this.status = status;
         }
+        
     }
 
 }