You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@marmotta.apache.org by wi...@apache.org on 2013/08/27 09:22:37 UTC
[3/3] git commit: MARMOTTA-145: fixed nio usage
MARMOTTA-145: fixed nio usage
Project: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/commit/14c3cf48
Tree: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/tree/14c3cf48
Diff: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/diff/14c3cf48
Branch: refs/heads/develop
Commit: 14c3cf481cbf9779f89da39ff01562a594737bfe
Parents: dc2b5ac
Author: Sergio Fernández <wi...@apache.org>
Authored: Tue Aug 27 09:21:20 2013 +0200
Committer: Sergio Fernández <wi...@apache.org>
Committed: Tue Aug 27 09:21:20 2013 +0200
----------------------------------------------------------------------
.../importer/ImportWatchServiceImpl.java | 145 ++++++++++---------
1 file changed, 76 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/14c3cf48/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
----------------------------------------------------------------------
diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
index ed50ffc..dee9081 100644
--- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
+++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
@@ -48,96 +48,103 @@ import org.slf4j.Logger;
* Implementation for watching import directory
*
* @author Sergio Fernández
- *
+ *
*/
@ApplicationScoped
public class ImportWatchServiceImpl implements ImportWatchService {
-
+
private static final String TASK_GROUP = "ImportWatch";
-
+
@Inject
- private Logger log;
-
+ private Logger log;
+
@Inject
private TaskManagerService taskManagerService;
-
+
@Inject
- private ConfigurationService configurationService;
-
+ private ConfigurationService configurationService;
+
@Inject
- private ImportService importService;
-
+ private ImportService importService;
+
@Inject
private ContextService contextService;
-
+
@Inject
private UserService userService;
-
+
private String path;
-
+
+ private int count;
+
public ImportWatchServiceImpl() {
-
+ count = 0;
}
-
+
@Override
- public void initialize(@Observes SystemStartupEvent event) {
- final String import_watch_path = configurationService.getHome() + File.separator + ConfigurationService.DIR_IMPORT;
+ public void initialize(@Observes SystemStartupEvent event) {
+ final String import_watch_path = configurationService.getHome() + File.separator + ConfigurationService.DIR_IMPORT;
this.path = import_watch_path;
-
- Runnable r = new Runnable() {
-
- @Override
- public void run() {
- final Task task = taskManagerService.createTask("Directory import watch", TASK_GROUP);
- task.updateMessage("watching...");
- task.updateDetailMessage("path", import_watch_path);
-
- try {
- Path path = Paths.get(import_watch_path);
- WatchService watchService = path.getFileSystem().newWatchService();
- path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);
- while (true) {
- final WatchKey key = watchService.take();
- for (WatchEvent<?> watchEvent : key.pollEvents()) {
- if (StandardWatchEventKinds.ENTRY_CREATE.equals(watchEvent.kind())) { //TODO: is it necessary?
- @SuppressWarnings("unchecked") final Path item = ((WatchEvent<Path>) watchEvent).context();
- task.updateMessage("importing...");
- task.updateDetailMessage("path", item.toString());
- if (execImport(item)) {
- log.info("Sucessfully imported file '{}'!", item.toString());
- Files.delete(item);
- }
- task.updateMessage("watching...");
- task.updateDetailMessage("path", import_watch_path);
- }
- }
- if (!key.reset()) {
- // exit loop if the key is not valid
- // e.g. if the directory was deleted
- break;
- }
- }
- } catch (IOException e) {
- log.error("Error registering the import watch service over '{}': {}", import_watch_path, e.getMessage());
- } catch (InterruptedException e) {
- log.error("Import watch service has been interrupted");
- }
- }
- };
-
- Thread t = new Thread(r);
- t.setName(TASK_GROUP+"(start:" + new Date() + ",path:" + this.path + ")");
- t.setDaemon(true);
- t.start();
-
- }
-
+
+ Runnable r = new Runnable() {
+
+ @Override
+ public void run() {
+ final Task task = taskManagerService.createTask("Directory import watch", TASK_GROUP);
+ task.updateMessage("watching...");
+ task.updateDetailMessage("path", import_watch_path);
+
+ try {
+ Path dir = Paths.get(path);
+ WatchService watchService = dir.getFileSystem().newWatchService();
+ dir.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);
+ while (true) {
+ final WatchKey key = watchService.take();
+ for (WatchEvent<?> event : key.pollEvents()) {
+ if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) { // TODO: is it necessary?
+ @SuppressWarnings("unchecked")
+ Path item = dir.resolve(((WatchEvent<Path>) event).context());
+ log.debug("Importing '{}'...", item.toString());
+ task.updateMessage("importing...");
+ task.updateDetailMessage("path", item.toString());
+ if (execImport(item)) {
+ log.info("Sucessfully imported file '{}'!", item.toString());
+ Files.delete(item);
+ }
+ task.updateProgress(++count);
+ task.updateMessage("watching...");
+ task.updateDetailMessage("path", import_watch_path);
+ }
+ }
+ if (!key.reset()) {
+ // exit loop if the key is not valid
+ // e.g. if the directory was deleted
+ break;
+ }
+ }
+ } catch (IOException e) {
+ log.error(
+ "Error registering the import watch service over '{}': {}",
+ import_watch_path, e.getMessage());
+ } catch (InterruptedException e) {
+ log.error("Import watch service has been interrupted");
+ }
+ }
+ };
+
+ Thread t = new Thread(r);
+ t.setName(TASK_GROUP + "(start:" + new Date() + ",path:" + this.path + ")");
+ t.setDaemon(true);
+ t.start();
+
+ }
+
@Override
public boolean execImport(Path item) {
try {
- importService.importData(Files.newInputStream(item),
- Rio.getParserFormatForFileName(item.toString()).getDefaultMIMEType(),
- userService.getAdminUser(),
+ importService.importData(Files.newInputStream(item), Rio
+ .getParserFormatForFileName(item.toString())
+ .getDefaultMIMEType(), userService.getAdminUser(),
contextService.getDefaultContext());
return true;
} catch (MarmottaImportException e) {