You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@marmotta.apache.org by wi...@apache.org on 2013/08/27 09:22:35 UTC
[1/3] git commit: MARMOTTA-145: added initial implementation of the
import watch service
Updated Branches:
refs/heads/develop 259b5d133 -> 14c3cf481
MARMOTTA-145: added initial implementation of the import watch service
Project: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/commit/091e48cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/tree/091e48cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/diff/091e48cb
Branch: refs/heads/develop
Commit: 091e48cb74037cb911aff0dcb515a65e50f8ddbf
Parents: 259b5d1
Author: Sergio Fernández <wi...@apache.org>
Authored: Mon Aug 26 17:56:41 2013 +0200
Committer: Sergio Fernández <wi...@apache.org>
Committed: Mon Aug 26 17:56:41 2013 +0200
----------------------------------------------------------------------
.../core/api/config/ConfigurationService.java | 6 +
.../core/api/importer/ImportWatchService.java | 46 +++++++
.../config/ConfigurationServiceImpl.java | 14 ++-
.../services/importer/ImportServiceImpl.java | 1 +
.../importer/ImportWatchServiceImpl.java | 123 +++++++++++++++++++
.../services/importer/rdf/RDFImporterImpl.java | 4 -
.../core/servlet/MarmottaPostStartupFilter.java | 5 -
.../core/startup/MarmottaStartupService.java | 2 +
8 files changed, 189 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/091e48cb/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/api/config/ConfigurationService.java
----------------------------------------------------------------------
diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/api/config/ConfigurationService.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/api/config/ConfigurationService.java
index ef79f58..58fdab7 100644
--- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/api/config/ConfigurationService.java
+++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/api/config/ConfigurationService.java
@@ -61,6 +61,12 @@ public interface ConfigurationService {
static final String CONTEXT_INFERRED = "inferred";
static final String CONTEXT_SYSTEM = "system";
+
+ static final String DIR_CONFIG = "config";
+
+ static final String DIR_LOG = "log";
+
+ static final String DIR_IMPORT = "import";
/**
* Get the base URI of the system.
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/091e48cb/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/api/importer/ImportWatchService.java
----------------------------------------------------------------------
diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/api/importer/ImportWatchService.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/api/importer/ImportWatchService.java
new file mode 100644
index 0000000..1906550
--- /dev/null
+++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/api/importer/ImportWatchService.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.marmotta.platform.core.api.importer;
+
+import java.nio.file.Path;
+
+import org.apache.marmotta.platform.core.events.SystemStartupEvent;
+
+/**
+ * A service for watching import directory
+ *
+ * @author Sergio Fernández
+ *
+ */
+public interface ImportWatchService {
+
+ /**
+ * Initialize the directory watching, performing an importation
+ * of new files copied there
+ */
+ public void initialize(SystemStartupEvent event);
+
+ /**
+ * Import an observed item
+ *
+ * @param item
+ * @return
+ */
+ boolean execImport(Path item);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/091e48cb/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/config/ConfigurationServiceImpl.java
----------------------------------------------------------------------
diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/config/ConfigurationServiceImpl.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/config/ConfigurationServiceImpl.java
index f6b38dc..9642b78 100644
--- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/config/ConfigurationServiceImpl.java
+++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/config/ConfigurationServiceImpl.java
@@ -79,7 +79,7 @@ import com.google.common.io.Resources;
@ApplicationScoped
public class ConfigurationServiceImpl implements ConfigurationService {
- private String home;
+ private String home;
private static Logger log = LoggerFactory.getLogger(ConfigurationService.class);
@@ -181,16 +181,23 @@ public class ConfigurationServiceImpl implements ConfigurationService {
f1.mkdirs();
}
// ensure directory for user configuration files
- File f2 = new File(getHome() + File.separator + "config");
+ File f2 = new File(getHome() + File.separator + DIR_CONFIG);
if(!f2.exists()) {
f2.mkdirs();
}
// ensure directory for logging messages
- File f3 = new File(getHome() + File.separator + "log");
+ File f3 = new File(getHome() + File.separator + DIR_LOG);
if(!f3.exists()) {
f3.mkdirs();
}
+
+ // ensure directory for importing data
+ File f4 = new File(getHome() + File.separator + DIR_IMPORT);
+ if(!f4.exists()) {
+ f4.mkdirs();
+ }
+
}
// the save configuration will be in the home directory
@@ -308,6 +315,7 @@ public class ConfigurationServiceImpl implements ConfigurationService {
log.info("Apache Marmotta Configuration Service: initialisation completed");
configurationInitEvent.fire(new ConfigurationServiceInitEvent());
+
} finally {
lock.writeLock().unlock();
}
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/091e48cb/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportServiceImpl.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportServiceImpl.java
index 0a65d84..ddde2ac 100644
--- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportServiceImpl.java
+++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportServiceImpl.java
@@ -100,4 +100,5 @@ public class ImportServiceImpl implements ImportService{
if(!importerMap.containsKey(type)) throw new MarmottaImportException("no importer defined for type "+type);
return importerMap.get(type);
}
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/091e48cb/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
----------------------------------------------------------------------
diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
new file mode 100644
index 0000000..b088902
--- /dev/null
+++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.marmotta.platform.core.services.importer;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.enterprise.event.Observes;
+import javax.inject.Inject;
+
+import org.apache.marmotta.platform.core.api.config.ConfigurationService;
+import org.apache.marmotta.platform.core.api.importer.ImportService;
+import org.apache.marmotta.platform.core.api.importer.ImportWatchService;
+import org.apache.marmotta.platform.core.api.triplestore.ContextService;
+import org.apache.marmotta.platform.core.api.user.UserService;
+import org.apache.marmotta.platform.core.events.SystemStartupEvent;
+import org.apache.marmotta.platform.core.exception.io.MarmottaImportException;
+import org.openrdf.rio.Rio;
+import org.slf4j.Logger;
+
+/**
+ * Implementation for watching import directory
+ *
+ * @author Sergio Fernández
+ *
+ */
+@ApplicationScoped
+public class ImportWatchServiceImpl implements ImportWatchService {
+
+ @Inject
+ private Logger log;
+
+ @Inject
+ private ConfigurationService configurationService;
+
+ @Inject
+ private ImportService importService;
+
+ @Inject
+ private ContextService contextService;
+
+ @Inject
+ private UserService userService;
+
+ private String path;
+
+ public ImportWatchServiceImpl() {
+
+ }
+
+ @Override
+ public void initialize(@Observes SystemStartupEvent event) {
+ this.path = configurationService.getHome() + File.separator + ConfigurationService.DIR_IMPORT;
+ try {
+ Path path = Paths.get(this.path);
+ WatchService watchService = path.getFileSystem().newWatchService();
+ path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);
+ while (true) {
+ final WatchKey key = watchService.take();
+ for (WatchEvent<?> watchEvent : key.pollEvents()) {
+ if (StandardWatchEventKinds.ENTRY_CREATE.equals(watchEvent.kind())) { //TODO: is it necessary?
+ @SuppressWarnings("unchecked") final Path item = ((WatchEvent<Path>) watchEvent).context();
+ if (execImport(item)) {
+ log.info("Sucessfully imported file '{}'!", item.toString());
+ Files.delete(item);
+ }
+ }
+ }
+ if (!key.reset()) {
+ // exit loop if the key is not valid
+ // e.g. if the directory was deleted
+ break;
+ }
+ }
+ } catch (IOException e) {
+ log.error("Error registering the import watch service over '{}': {}", this.path, e.getMessage());
+ } catch (InterruptedException e) {
+ log.error("Import watch service has been interrupted");
+ }
+ }
+
+ @Override
+ public boolean execImport(Path item) {
+ try {
+ importService.importData(Files.newInputStream(item),
+ Rio.getParserFormatForFileName(item.toString()).getDefaultMIMEType(),
+ userService.getAdminUser(),
+ contextService.getDefaultContext());
+ return true;
+ } catch (MarmottaImportException e) {
+ log.error("Error importing file {} from the local directory: {}", item.toString(), e.getMessage());
+ return false;
+ } catch (IOException e) {
+ e.printStackTrace();
+ log.error("Error retrieving file {} from the local directory: {}", item.toString(), e.getMessage());
+ return false;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/091e48cb/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/rdf/RDFImporterImpl.java
----------------------------------------------------------------------
diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/rdf/RDFImporterImpl.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/rdf/RDFImporterImpl.java
index c51c49e..06d869a 100644
--- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/rdf/RDFImporterImpl.java
+++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/rdf/RDFImporterImpl.java
@@ -62,7 +62,6 @@ public class RDFImporterImpl implements Importer {
@Inject
private Logger log;
-
@Inject
private ConfigurationService configurationService;
@@ -76,9 +75,6 @@ public class RDFImporterImpl implements Importer {
private List<String> acceptTypes;
-
-
-
/**
* Get a collection of all mime types accepted by this io. Used for automatically
* selecting the appropriate io in ImportService.
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/091e48cb/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/servlet/MarmottaPostStartupFilter.java
----------------------------------------------------------------------
diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/servlet/MarmottaPostStartupFilter.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/servlet/MarmottaPostStartupFilter.java
index 6c27bb9..b7095a5 100644
--- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/servlet/MarmottaPostStartupFilter.java
+++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/servlet/MarmottaPostStartupFilter.java
@@ -30,10 +30,8 @@ import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import org.apache.marmotta.platform.core.api.config.ConfigurationService;
-import org.apache.marmotta.platform.core.api.user.UserService;
import org.apache.marmotta.platform.core.events.SystemStartupEvent;
import org.apache.marmotta.platform.core.model.module.ModuleConfiguration;
-
import org.slf4j.Logger;
/**
@@ -50,9 +48,6 @@ public class MarmottaPostStartupFilter implements Filter {
private ConfigurationService configurationService;
@Inject
- private UserService userService;
-
- @Inject
private ModuleConfiguration moduleConfiguration;
@Inject @Any
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/091e48cb/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/startup/MarmottaStartupService.java
----------------------------------------------------------------------
diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/startup/MarmottaStartupService.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/startup/MarmottaStartupService.java
index ca45f40..f0b4ca5 100644
--- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/startup/MarmottaStartupService.java
+++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/startup/MarmottaStartupService.java
@@ -253,6 +253,7 @@ public class MarmottaStartupService {
configurationService.setInitialising(false);
startupEvent.fire(new SystemStartupEvent());
+
} finally {
lock.unlock();
}
@@ -270,4 +271,5 @@ public class MarmottaStartupService {
public boolean isHostStarted() {
return hostStarted;
}
+
}
[2/3] git commit: MARMOTTA-145: integrated import watch service with
the task manager
Posted by wi...@apache.org.
MARMOTTA-145: integrated import watch service with the task manager
Project: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/commit/dc2b5acc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/tree/dc2b5acc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/diff/dc2b5acc
Branch: refs/heads/develop
Commit: dc2b5accaf6dcb10745f0b0501be3c9d19a9a79e
Parents: 091e48c
Author: Sergio Fernández <wi...@apache.org>
Authored: Tue Aug 27 08:49:58 2013 +0200
Committer: Sergio Fernández <wi...@apache.org>
Committed: Tue Aug 27 08:49:58 2013 +0200
----------------------------------------------------------------------
.../importer/ImportWatchServiceImpl.java | 84 +++++++++++++-------
.../core/services/io/MarmottaIOServiceImpl.java | 5 +-
.../platform/core/services/task/TaskImpl.java | 2 +-
.../core/webservices/io/ImportWebService.java | 2 +
4 files changed, 61 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/dc2b5acc/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
----------------------------------------------------------------------
diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
index b088902..ed50ffc 100644
--- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
+++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
@@ -26,6 +26,7 @@ import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
+import java.util.Date;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
@@ -34,6 +35,8 @@ import javax.inject.Inject;
import org.apache.marmotta.platform.core.api.config.ConfigurationService;
import org.apache.marmotta.platform.core.api.importer.ImportService;
import org.apache.marmotta.platform.core.api.importer.ImportWatchService;
+import org.apache.marmotta.platform.core.api.task.Task;
+import org.apache.marmotta.platform.core.api.task.TaskManagerService;
import org.apache.marmotta.platform.core.api.triplestore.ContextService;
import org.apache.marmotta.platform.core.api.user.UserService;
import org.apache.marmotta.platform.core.events.SystemStartupEvent;
@@ -50,10 +53,15 @@ import org.slf4j.Logger;
@ApplicationScoped
public class ImportWatchServiceImpl implements ImportWatchService {
+ private static final String TASK_GROUP = "ImportWatch";
+
@Inject
private Logger log;
@Inject
+ private TaskManagerService taskManagerService;
+
+ @Inject
private ConfigurationService configurationService;
@Inject
@@ -73,33 +81,55 @@ public class ImportWatchServiceImpl implements ImportWatchService {
@Override
public void initialize(@Observes SystemStartupEvent event) {
- this.path = configurationService.getHome() + File.separator + ConfigurationService.DIR_IMPORT;
- try {
- Path path = Paths.get(this.path);
- WatchService watchService = path.getFileSystem().newWatchService();
- path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);
- while (true) {
- final WatchKey key = watchService.take();
- for (WatchEvent<?> watchEvent : key.pollEvents()) {
- if (StandardWatchEventKinds.ENTRY_CREATE.equals(watchEvent.kind())) { //TODO: is it necessary?
- @SuppressWarnings("unchecked") final Path item = ((WatchEvent<Path>) watchEvent).context();
- if (execImport(item)) {
- log.info("Sucessfully imported file '{}'!", item.toString());
- Files.delete(item);
- }
- }
- }
- if (!key.reset()) {
- // exit loop if the key is not valid
- // e.g. if the directory was deleted
- break;
- }
- }
- } catch (IOException e) {
- log.error("Error registering the import watch service over '{}': {}", this.path, e.getMessage());
- } catch (InterruptedException e) {
- log.error("Import watch service has been interrupted");
- }
+ final String import_watch_path = configurationService.getHome() + File.separator + ConfigurationService.DIR_IMPORT;
+ this.path = import_watch_path;
+
+ Runnable r = new Runnable() {
+
+ @Override
+ public void run() {
+ final Task task = taskManagerService.createTask("Directory import watch", TASK_GROUP);
+ task.updateMessage("watching...");
+ task.updateDetailMessage("path", import_watch_path);
+
+ try {
+ Path path = Paths.get(import_watch_path);
+ WatchService watchService = path.getFileSystem().newWatchService();
+ path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);
+ while (true) {
+ final WatchKey key = watchService.take();
+ for (WatchEvent<?> watchEvent : key.pollEvents()) {
+ if (StandardWatchEventKinds.ENTRY_CREATE.equals(watchEvent.kind())) { //TODO: is it necessary?
+ @SuppressWarnings("unchecked") final Path item = ((WatchEvent<Path>) watchEvent).context();
+ task.updateMessage("importing...");
+ task.updateDetailMessage("path", item.toString());
+ if (execImport(item)) {
+ log.info("Sucessfully imported file '{}'!", item.toString());
+ Files.delete(item);
+ }
+ task.updateMessage("watching...");
+ task.updateDetailMessage("path", import_watch_path);
+ }
+ }
+ if (!key.reset()) {
+ // exit loop if the key is not valid
+ // e.g. if the directory was deleted
+ break;
+ }
+ }
+ } catch (IOException e) {
+ log.error("Error registering the import watch service over '{}': {}", import_watch_path, e.getMessage());
+ } catch (InterruptedException e) {
+ log.error("Import watch service has been interrupted");
+ }
+ }
+ };
+
+ Thread t = new Thread(r);
+ t.setName(TASK_GROUP+"(start:" + new Date() + ",path:" + this.path + ")");
+ t.setDaemon(true);
+ t.start();
+
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/dc2b5acc/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/io/MarmottaIOServiceImpl.java
----------------------------------------------------------------------
diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/io/MarmottaIOServiceImpl.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/io/MarmottaIOServiceImpl.java
index f96de33..3c8d9b6 100644
--- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/io/MarmottaIOServiceImpl.java
+++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/io/MarmottaIOServiceImpl.java
@@ -65,12 +65,8 @@ public class MarmottaIOServiceImpl implements MarmottaIOService {
producedTypes.addAll(format.getMIMETypes());
}
log.info(" - available writers: {}", Arrays.toString(producedTypes.toArray()));
-
-
-
}
-
/**
* returns a list of all mimetypes which can be parsed by implemented parsers
* @return
@@ -108,4 +104,5 @@ public class MarmottaIOServiceImpl implements MarmottaIOService {
public RDFFormat getParser(String mimetype) {
return parserRegistry.getFileFormatForMIMEType(mimetype);
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/dc2b5acc/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/task/TaskImpl.java
----------------------------------------------------------------------
diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/task/TaskImpl.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/task/TaskImpl.java
index 56db715..f0ce2b5 100644
--- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/task/TaskImpl.java
+++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/task/TaskImpl.java
@@ -20,7 +20,6 @@ package org.apache.marmotta.platform.core.services.task;
import org.apache.marmotta.platform.core.api.task.Task;
import org.apache.marmotta.platform.core.api.task.TaskManagerService;
-
class TaskImpl extends Task {
private static final String WAITING_DETAIL = "Waiting in status";
@@ -49,4 +48,5 @@ class TaskImpl extends Task {
public void subTaskEnded() {
updateMessage(detailMessages.remove(WAITING_DETAIL));
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/dc2b5acc/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/webservices/io/ImportWebService.java
----------------------------------------------------------------------
diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/webservices/io/ImportWebService.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/webservices/io/ImportWebService.java
index 26ca893..9b0b36d 100644
--- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/webservices/io/ImportWebService.java
+++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/webservices/io/ImportWebService.java
@@ -272,6 +272,7 @@ public class ImportWebService {
}
protected static class Status {
+
boolean isRunning;
String status;
String message;
@@ -307,6 +308,7 @@ public class ImportWebService {
public void setStatus(String status) {
this.status = status;
}
+
}
}
[3/3] git commit: MARMOTTA-145: fixed nio usage
Posted by wi...@apache.org.
MARMOTTA-145: fixed nio usage
Project: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/commit/14c3cf48
Tree: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/tree/14c3cf48
Diff: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/diff/14c3cf48
Branch: refs/heads/develop
Commit: 14c3cf481cbf9779f89da39ff01562a594737bfe
Parents: dc2b5ac
Author: Sergio Fernández <wi...@apache.org>
Authored: Tue Aug 27 09:21:20 2013 +0200
Committer: Sergio Fernández <wi...@apache.org>
Committed: Tue Aug 27 09:21:20 2013 +0200
----------------------------------------------------------------------
.../importer/ImportWatchServiceImpl.java | 145 ++++++++++---------
1 file changed, 76 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/14c3cf48/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
----------------------------------------------------------------------
diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
index ed50ffc..dee9081 100644
--- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
+++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/importer/ImportWatchServiceImpl.java
@@ -48,96 +48,103 @@ import org.slf4j.Logger;
* Implementation for watching import directory
*
* @author Sergio Fernández
- *
+ *
*/
@ApplicationScoped
public class ImportWatchServiceImpl implements ImportWatchService {
-
+
private static final String TASK_GROUP = "ImportWatch";
-
+
@Inject
- private Logger log;
-
+ private Logger log;
+
@Inject
private TaskManagerService taskManagerService;
-
+
@Inject
- private ConfigurationService configurationService;
-
+ private ConfigurationService configurationService;
+
@Inject
- private ImportService importService;
-
+ private ImportService importService;
+
@Inject
private ContextService contextService;
-
+
@Inject
private UserService userService;
-
+
private String path;
-
+
+ private int count;
+
public ImportWatchServiceImpl() {
-
+ count = 0;
}
-
+
@Override
- public void initialize(@Observes SystemStartupEvent event) {
- final String import_watch_path = configurationService.getHome() + File.separator + ConfigurationService.DIR_IMPORT;
+ public void initialize(@Observes SystemStartupEvent event) {
+ final String import_watch_path = configurationService.getHome() + File.separator + ConfigurationService.DIR_IMPORT;
this.path = import_watch_path;
-
- Runnable r = new Runnable() {
-
- @Override
- public void run() {
- final Task task = taskManagerService.createTask("Directory import watch", TASK_GROUP);
- task.updateMessage("watching...");
- task.updateDetailMessage("path", import_watch_path);
-
- try {
- Path path = Paths.get(import_watch_path);
- WatchService watchService = path.getFileSystem().newWatchService();
- path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);
- while (true) {
- final WatchKey key = watchService.take();
- for (WatchEvent<?> watchEvent : key.pollEvents()) {
- if (StandardWatchEventKinds.ENTRY_CREATE.equals(watchEvent.kind())) { //TODO: is it necessary?
- @SuppressWarnings("unchecked") final Path item = ((WatchEvent<Path>) watchEvent).context();
- task.updateMessage("importing...");
- task.updateDetailMessage("path", item.toString());
- if (execImport(item)) {
- log.info("Sucessfully imported file '{}'!", item.toString());
- Files.delete(item);
- }
- task.updateMessage("watching...");
- task.updateDetailMessage("path", import_watch_path);
- }
- }
- if (!key.reset()) {
- // exit loop if the key is not valid
- // e.g. if the directory was deleted
- break;
- }
- }
- } catch (IOException e) {
- log.error("Error registering the import watch service over '{}': {}", import_watch_path, e.getMessage());
- } catch (InterruptedException e) {
- log.error("Import watch service has been interrupted");
- }
- }
- };
-
- Thread t = new Thread(r);
- t.setName(TASK_GROUP+"(start:" + new Date() + ",path:" + this.path + ")");
- t.setDaemon(true);
- t.start();
-
- }
-
+
+ Runnable r = new Runnable() {
+
+ @Override
+ public void run() {
+ final Task task = taskManagerService.createTask("Directory import watch", TASK_GROUP);
+ task.updateMessage("watching...");
+ task.updateDetailMessage("path", import_watch_path);
+
+ try {
+ Path dir = Paths.get(path);
+ WatchService watchService = dir.getFileSystem().newWatchService();
+ dir.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);
+ while (true) {
+ final WatchKey key = watchService.take();
+ for (WatchEvent<?> event : key.pollEvents()) {
+ if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) { // TODO: is it necessary?
+ @SuppressWarnings("unchecked")
+ Path item = dir.resolve(((WatchEvent<Path>) event).context());
+ log.debug("Importing '{}'...", item.toString());
+ task.updateMessage("importing...");
+ task.updateDetailMessage("path", item.toString());
+ if (execImport(item)) {
+ log.info("Sucessfully imported file '{}'!", item.toString());
+ Files.delete(item);
+ }
+ task.updateProgress(++count);
+ task.updateMessage("watching...");
+ task.updateDetailMessage("path", import_watch_path);
+ }
+ }
+ if (!key.reset()) {
+ // exit loop if the key is not valid
+ // e.g. if the directory was deleted
+ break;
+ }
+ }
+ } catch (IOException e) {
+ log.error(
+ "Error registering the import watch service over '{}': {}",
+ import_watch_path, e.getMessage());
+ } catch (InterruptedException e) {
+ log.error("Import watch service has been interrupted");
+ }
+ }
+ };
+
+ Thread t = new Thread(r);
+ t.setName(TASK_GROUP + "(start:" + new Date() + ",path:" + this.path + ")");
+ t.setDaemon(true);
+ t.start();
+
+ }
+
@Override
public boolean execImport(Path item) {
try {
- importService.importData(Files.newInputStream(item),
- Rio.getParserFormatForFileName(item.toString()).getDefaultMIMEType(),
- userService.getAdminUser(),
+ importService.importData(Files.newInputStream(item), Rio
+ .getParserFormatForFileName(item.toString())
+ .getDefaultMIMEType(), userService.getAdminUser(),
contextService.getDefaultContext());
return true;
} catch (MarmottaImportException e) {