You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@marmotta.apache.org by wi...@apache.org on 2013/08/27 09:22:37 UTC

[3/3] git commit: MARMOTTA-145: fixed nio usage

MARMOTTA-145: fixed nio usage


Project: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/commit/14c3cf48
Tree: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/tree/14c3cf48
Diff: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/diff/14c3cf48

Branch: refs/heads/develop
Commit: 14c3cf481cbf9779f89da39ff01562a594737bfe
Parents: dc2b5ac
Author: Sergio Fernández <wi...@apache.org>
Authored: Tue Aug 27 09:21:20 2013 +0200
Committer: Sergio Fernández <wi...@apache.org>
Committed: Tue Aug 27 09:21:20 2013 +0200

----------------------------------------------------------------------
 .../importer/ImportWatchServiceImpl.java        | 145 ++++++++++---------
 1 file changed, 76 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/14c3cf48/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
----------------------------------------------------------------------
diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
index ed50ffc..dee9081 100644
--- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
+++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
@@ -48,96 +48,103 @@ import org.slf4j.Logger;
  * Implementation for watching import directory
  * 
  * @author Sergio Fernández
- *
+ * 
  */
 @ApplicationScoped
 public class ImportWatchServiceImpl implements ImportWatchService {
-	
+
 	private static final String TASK_GROUP = "ImportWatch";
-	
+
 	@Inject
-    private Logger log;
-	
+	private Logger log;
+
 	@Inject
 	private TaskManagerService taskManagerService;
-	
+
 	@Inject
-    private ConfigurationService configurationService;
-	
+	private ConfigurationService configurationService;
+
 	@Inject
-    private ImportService importService;
-	
+	private ImportService importService;
+
 	@Inject
 	private ContextService contextService;
-	
+
 	@Inject
 	private UserService userService;
-	
+
 	private String path;
-	
+
+	private int count;
+
 	public ImportWatchServiceImpl() {
-		
+		count = 0;
 	}
-	
+
 	@Override
-    public void initialize(@Observes SystemStartupEvent event) {
-		final String import_watch_path =  configurationService.getHome() + File.separator + ConfigurationService.DIR_IMPORT;
+	public void initialize(@Observes SystemStartupEvent event) {
+		final String import_watch_path = configurationService.getHome() + File.separator + ConfigurationService.DIR_IMPORT;
 		this.path = import_watch_path;
-        	
-        Runnable r = new Runnable() {
-
-            @Override
-            public void run() {
-                final Task task = taskManagerService.createTask("Directory import watch", TASK_GROUP);
-                task.updateMessage("watching...");
-                task.updateDetailMessage("path", import_watch_path);
-                
-                try {
-	                Path path = Paths.get(import_watch_path);
-	            	WatchService watchService = path.getFileSystem().newWatchService();
-	    			path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);
-	    			while (true) {
-	    				final WatchKey key = watchService.take();
-	    				for (WatchEvent<?> watchEvent : key.pollEvents()) {
-	    					if (StandardWatchEventKinds.ENTRY_CREATE.equals(watchEvent.kind())) { //TODO: is it necessary?
-	    						@SuppressWarnings("unchecked") final Path item = ((WatchEvent<Path>) watchEvent).context();
-	    	                    task.updateMessage("importing...");
-	    	                    task.updateDetailMessage("path", item.toString());
-	    						if (execImport(item)) {
-	    							log.info("Sucessfully imported file '{}'!", item.toString());
-	    							Files.delete(item);
-	    						}
-	    	                    task.updateMessage("watching...");
-	    	                    task.updateDetailMessage("path", import_watch_path);
-	    					}
-	    				}
-	    				if (!key.reset()) {
-	    					// exit loop if the key is not valid
-	    					// e.g. if the directory was deleted
-	    					break;
-	    				}
-	    			}
-        		} catch (IOException e) {
-        			log.error("Error registering the import watch service over '{}': {}", import_watch_path, e.getMessage());
-        		} catch (InterruptedException e) {
-        			log.error("Import watch service has been interrupted");
-        		}
-            }
-        };
-
-        Thread t = new Thread(r);
-        t.setName(TASK_GROUP+"(start:" + new Date() + ",path:" + this.path + ")");
-        t.setDaemon(true);
-        t.start();
-        
-    }
-	
+
+		Runnable r = new Runnable() {
+
+			@Override
+			public void run() {
+				final Task task = taskManagerService.createTask("Directory import watch", TASK_GROUP);
+				task.updateMessage("watching...");
+				task.updateDetailMessage("path", import_watch_path);
+
+				try {
+					Path dir = Paths.get(path);
+					WatchService watchService = dir.getFileSystem().newWatchService();
+					dir.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);
+					while (true) {
+						final WatchKey key = watchService.take();
+						for (WatchEvent<?> event : key.pollEvents()) {
+							if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) { // TODO: is it necessary?
+								@SuppressWarnings("unchecked")
+								Path item = dir.resolve(((WatchEvent<Path>) event).context());
+								log.debug("Importing '{}'...", item.toString());
+								task.updateMessage("importing...");
+								task.updateDetailMessage("path", item.toString());
+								if (execImport(item)) {
+									log.info("Sucessfully imported file '{}'!", item.toString());
+									Files.delete(item);
+								}
+								task.updateProgress(++count);
+								task.updateMessage("watching...");
+								task.updateDetailMessage("path", import_watch_path);
+							}
+						}
+						if (!key.reset()) {
+							// exit loop if the key is not valid
+							// e.g. if the directory was deleted
+							break;
+						}
+					}
+				} catch (IOException e) {
+					log.error(
+							"Error registering the import watch service over '{}': {}",
+							import_watch_path, e.getMessage());
+				} catch (InterruptedException e) {
+					log.error("Import watch service has been interrupted");
+				}
+			}
+		};
+
+		Thread t = new Thread(r);
+		t.setName(TASK_GROUP + "(start:" + new Date() + ",path:" + this.path + ")");
+		t.setDaemon(true);
+		t.start();
+
+	}
+
 	@Override
 	public boolean execImport(Path item) {
 		try {
-			importService.importData(Files.newInputStream(item), 
-					Rio.getParserFormatForFileName(item.toString()).getDefaultMIMEType(), 
-					userService.getAdminUser(), 
+			importService.importData(Files.newInputStream(item), Rio
+					.getParserFormatForFileName(item.toString())
+					.getDefaultMIMEType(), userService.getAdminUser(),
 					contextService.getDefaultContext());
 			return true;
 		} catch (MarmottaImportException e) {