You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@isis.apache.org by da...@apache.org on 2014/07/03 17:01:10 UTC
[04/63] [abbrv] [partial] ISIS-832: mothballing components
http://git-wip-us.apache.org/repos/asf/isis/blob/680f0c8d/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/FileContent.java
----------------------------------------------------------------------
diff --git a/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/FileContent.java b/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/FileContent.java
new file mode 100644
index 0000000..c794dbf
--- /dev/null
+++ b/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/FileContent.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.isis.objectstore.nosql.db.file.server;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+class FileContent {
+
+ private static final String ENCODING = "utf-8";
+
+ final char command;
+ final String id;
+ final String currentVersion;
+ final String newVersion;
+ final String data;
+ final String type;
+
+ public FileContent(final char command, final String id, final String currentVersion, final String newVersion, final String type, final String buf) {
+ this.command = command;
+ this.id = id;
+ this.currentVersion = currentVersion;
+ this.newVersion = newVersion;
+ this.type = type;
+ this.data = buf;
+ }
+
+ public void write(final OutputStream output) throws IOException {
+ output.write(type.getBytes(ENCODING));
+ output.write(' ');
+ output.write(id.getBytes(ENCODING));
+ output.write(' ');
+ output.write(newVersion.getBytes(ENCODING));
+ output.write('\n');
+ output.write(data.getBytes(ENCODING));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/isis/blob/680f0c8d/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/FileServer.java
----------------------------------------------------------------------
diff --git a/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/FileServer.java b/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/FileServer.java
new file mode 100644
index 0000000..fe6c832
--- /dev/null
+++ b/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/FileServer.java
@@ -0,0 +1,686 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.isis.objectstore.nosql.db.file.server;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.LineNumberReader;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.net.ConnectException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
+
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.configuration.CompositeConfiguration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.configuration.SystemConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.isis.core.commons.lang.ObjectExtensions;
+import org.apache.isis.objectstore.nosql.NoSqlStoreException;
+
+public class FileServer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FileServer.class);
+ private static final String DEFAULT_HOST = "localhost";
+ private static final int DEFAULT_SERVICE_PORT = 9100;
+ private static final int DEFAULT_CONTROL_PORT = 9101;
+ private static final int DEFAULT_SYNC_PORT = 9102;
+ private static final int BACKLOG = 0;
+ private static final int INIT = 1;
+ private static final int RECOVERY_LOG = 2;
+
+ public static void main(final String[] args) throws IOException, ParseException {
+
+ final Options options = new Options();
+ options.addOption("h", "help", false, "Show this help");
+ options.addOption("m", "mode", true, "mode: normal | secondary | recovery | archive");
+
+ final CommandLineParser parser = new BasicParser();
+ final CommandLine cmd = parser.parse(options, args);
+
+ if (cmd.hasOption('h')) {
+ printHelp(options);
+ return;
+ }
+
+ final String mode = cmd.getOptionValue("m");
+
+ final List<String> argList = ObjectExtensions.asT(cmd.getArgList());
+ if ("recovery".equals(mode)) {
+ final FileServer fileServer = new FileServer();
+ fileServer.startRecovery(argList);
+ } else if ("archive".equals(mode)) {
+ final FileServer fileServer = new FileServer();
+ fileServer.startArchive(argList);
+ } else if ("secondary".equals(mode)) {
+ final FileServer fileServer = new FileServer();
+ fileServer.startSecondary();
+ } else if (mode == null || "normal".equals(mode)) {
+ final FileServer fileServer = new FileServer();
+ fileServer.startNormal();
+ } else {
+ printHelp(options);
+ }
+ }
+
+ private static void printHelp(final Options options) {
+ final HelpFormatter help = new HelpFormatter();
+ help.printHelp("FileSever [OPTIONS] [FIRST RECOVERY FILES] [LAST RECOVERY FILES]", options);
+ }
+
+ private FileServerProcessor server;
+ private CompositeConfiguration config;
+
+ private boolean awaitConnections = true;
+ private boolean isQuiescent = false;
+ private long requests;
+
+ public FileServer() {
+ org.apache.log4j.PropertyConfigurator.configure("config/logging.properties");
+
+ try {
+ config = new CompositeConfiguration();
+ config.addConfiguration(new SystemConfiguration());
+ config.addConfiguration(new PropertiesConfiguration("config/server.properties"));
+
+ final String data = config.getString("fileserver.data");
+ final String services = config.getString("fileserver.services");
+ final String logs = config.getString("fileserver.logs");
+ final String archive = config.getString("fileserver.archive");
+
+ Util.setDirectory(data, services, logs, archive);
+ server = new FileServerProcessor();
+ } catch (final ConfigurationException e) {
+ LOG.error("configuration failure", e);
+ System.out.println(e.getMessage());
+ System.exit(0);
+ }
+ }
+
+ private void startNormal() {
+ new Thread("control") {
+ @Override
+ public void run() {
+ startControl();
+ };
+ }.start();
+ new Thread("service") {
+ @Override
+ public void run() {
+ startService();
+ };
+ }.start();
+ new Thread("log-rolling") {
+ @Override
+ public void run() {
+ startLogRolling();
+ }
+ }.start();
+ if (config.getBoolean("fileserver.sync", false)) {
+ new Thread("sync") {
+ @Override
+ public void run() {
+ startSyncing();
+ };
+ }.start();
+ } else {
+ LOG.info("not syncing to secondary server");
+ }
+
+ }
+
+ private void startService() {
+ final String serviceHost = config.getString("fileserver.host", DEFAULT_HOST);
+ final int servicePort = config.getInt("fileserver.port", DEFAULT_SERVICE_PORT);
+ final int connectionTimeout = config.getInt("fileserver.connection.timeout", 5000);
+ final int readTimeout = config.getInt("fileserver.read.timeout", 5000);
+
+ ServerSocket socket = null;
+ try {
+ LOG.debug("setting up service socket on " + serviceHost + ":" + servicePort);
+ final InetAddress address = InetAddress.getByName(serviceHost);
+ socket = new ServerSocket(servicePort, BACKLOG, address);
+ socket.setSoTimeout(connectionTimeout);
+ LOG.info("file service listenting on " + socket.getInetAddress().getHostAddress() + " port " + socket.getLocalPort());
+ LOG.debug("file service listenting on " + socket);
+ final LogRange logFileRange = Util.logFileRange();
+ if (!logFileRange.noLogFile()) {
+ final long lastRecoveryFile = logFileRange.getLast();
+ final File file = Util.logFile(lastRecoveryFile);
+ LOG.info("replaying last recovery file: " + file.getAbsolutePath());
+ recover(file);
+ }
+ server.startup();
+ } catch (final UnknownHostException e) {
+ LOG.error("Unknown host " + serviceHost, e);
+ System.exit(0);
+ } catch (final IOException e) {
+ LOG.error("start failure - networking not set up for " + serviceHost, e);
+ System.exit(0);
+ } catch (final RuntimeException e) {
+ LOG.error("start failure", e);
+ System.exit(0);
+ }
+ do {
+ try {
+ while (isQuiescent) {
+ try {
+ Thread.sleep(300);
+ } catch (final InterruptedException ignore) {
+ }
+ }
+ final Socket connection = socket.accept();
+ LOG.debug("connection from " + connection);
+ connection.setSoTimeout(readTimeout);
+ serviceConnection(connection, readTimeout);
+ } catch (final SocketTimeoutException expected) {
+ } catch (final IOException e) {
+ LOG.error("networking problem", e);
+ }
+ } while (awaitConnections);
+ }
+
+ private void serviceConnection(final Socket connection, final int readTimeout) {
+ try {
+ final InputStream input = connection.getInputStream();
+ final OutputStream output = connection.getOutputStream();
+ final ServerConnection pipe = new ServerConnection(input, output);
+ requests++;
+ server.process(pipe);
+ pipe.logComplete();
+ } catch (final NoSqlStoreException e) {
+ if (e.getCause() instanceof SocketTimeoutException) {
+ LOG.error("read timed out after " + (readTimeout / 1000.0) + " seconds", e);
+ } else {
+ LOG.error("file server failure", e);
+ }
+ } catch (final IOException e) {
+ LOG.error("networking failure", e);
+ } catch (final RuntimeException e) {
+ LOG.error("request failure", e);
+ } finally {
+ try {
+ connection.close();
+ } catch (final IOException e) {
+ LOG.warn("failure to close connection", e);
+ }
+ }
+ }
+
+ private void startSyncing() {
+ final String syncHost = config.getString("fileserver.sync-host", DEFAULT_HOST);
+ final int syncPort = config.getInt("fileserver.sync-port", DEFAULT_SYNC_PORT);
+ final int connectionTimeout = config.getInt("fileserver.connection.timeout", 5000);
+
+ LOG.info("preparing to sync to secondary server on " + syncHost + " port " + syncPort);
+
+ final InetAddress address;
+ try {
+ address = InetAddress.getByName(syncHost);
+ } catch (final UnknownHostException e) {
+ LOG.error("Unknown host " + syncHost, e);
+ System.exit(0);
+ return;
+ }
+
+ while (awaitConnections) {
+ Socket socket = null;
+ try {
+ socket = new Socket(address, syncPort);
+ LOG.info("sync connected to " + socket.getInetAddress().getHostAddress() + " port " + socket.getLocalPort());
+
+ final CRC32 crc32 = new CRC32();
+ final DataOutput output = new DataOutputStream(new CheckedOutputStream(socket.getOutputStream(), crc32));
+ final DataInput input = new DataInputStream(socket.getInputStream());
+ output.writeByte(INIT);
+ long logId = input.readLong();
+ do {
+ final long nextLogId = logId + 1;
+ final File file = Util.logFile(nextLogId);
+ if (file.exists() && server.getLogger().isWritten(nextLogId)) {
+ logId++;
+
+ output.writeByte(RECOVERY_LOG);
+ crc32.reset();
+ output.writeLong(logId);
+
+ LOG.info("sending recovery file: " + file.getName());
+ final BufferedInputStream fileInput = new BufferedInputStream(new FileInputStream(file));
+
+ final byte[] buffer = new byte[8092];
+ int read;
+ while ((read = fileInput.read(buffer)) > 0) {
+ output.writeInt(read);
+ output.write(buffer, 0, read);
+ }
+ output.writeInt(0);
+
+ output.writeLong(crc32.getValue());
+ }
+ try {
+ Thread.sleep(300);
+ } catch (final InterruptedException ignore) {
+ }
+
+ while (isQuiescent) {
+ try {
+ Thread.sleep(300);
+ } catch (final InterruptedException ignore) {
+ }
+ }
+ } while (awaitConnections);
+
+ } catch (final ConnectException e) {
+ LOG.warn("not yet connected to secondary server at " + syncHost + " port " + syncPort);
+ try {
+ Thread.sleep(connectionTimeout);
+ } catch (final InterruptedException ignore) {
+ }
+ } catch (final IOException e) {
+ LOG.error("start failure - networking not set up for " + syncHost, e);
+ try {
+ Thread.sleep(300);
+ } catch (final InterruptedException ignore) {
+ }
+ } catch (final RuntimeException e) {
+ LOG.error("start failure", e);
+ try {
+ Thread.sleep(300);
+ } catch (final InterruptedException ignore) {
+ }
+ }
+ }
+
+ }
+
+ private void startControl() {
+ final String controlHost = config.getString("fileserver.control-host", DEFAULT_HOST);
+ final int controlPort = config.getInt("fileserver.control-port", DEFAULT_CONTROL_PORT);
+ final int connectionTimeout = config.getInt("fileserver.connection.timeout", 5000);
+
+ ServerSocket socket = null;
+ try {
+ LOG.debug("setting up control socket on " + controlHost + ":" + controlPort);
+ final InetAddress address = InetAddress.getByName(controlHost);
+ socket = new ServerSocket(controlPort, 0, address);
+ socket.setSoTimeout(connectionTimeout);
+ LOG.info("file control listenting on " + socket.getInetAddress().getHostAddress() + " port " + socket.getLocalPort());
+ LOG.debug("file control listenting on " + socket);
+ } catch (final UnknownHostException e) {
+ LOG.error("Unknown host " + controlHost, e);
+ System.exit(0);
+ } catch (final IOException e) {
+ LOG.error("start failure - networking not set up for " + controlHost, e);
+ System.exit(0);
+ } catch (final RuntimeException e) {
+ LOG.error("start failure", e);
+ System.exit(0);
+ }
+ do {
+ try {
+ final Socket connection = socket.accept();
+ LOG.info("control connection from " + connection);
+ controlConnection(connection);
+ } catch (final SocketTimeoutException expected) {
+ } catch (final IOException e) {
+ LOG.error("networking problem", e);
+ }
+ } while (awaitConnections);
+ }
+
+ private void controlConnection(final Socket connection) {
+ try {
+ final InputStream input = connection.getInputStream();
+ final BufferedReader reader = new BufferedReader(new InputStreamReader(input));
+ final OutputStream output = connection.getOutputStream();
+ final PrintWriter print = new PrintWriter(output);
+ print.print("> ");
+ print.flush();
+ String line;
+ while ((line = reader.readLine()) != null) {
+ if ("shutdown".equals(line)) {
+ awaitConnections = false;
+ print.println("Server shutdown initiated...");
+ print.flush();
+ server.shutdown();
+ break;
+ } else if ("quiesce".equals(line)) {
+ isQuiescent = true;
+ final String message = "Placing server in a quiescent state";
+ LOG.info(message);
+ print.println(message);
+ print.print("> ");
+ print.flush();
+ } else if ("resume".equals(line)) {
+ if (isQuiescent) {
+ isQuiescent = false;
+ final String message = "Resuming from a quiescent state";
+ LOG.info(message);
+ print.println(message);
+ } else {
+ print.println("Can't resume as not currently in a quiescent state");
+ }
+ print.print("> ");
+ print.flush();
+ } else if ("quit".equals(line)) {
+ print.println("Bye");
+ print.flush();
+ break;
+ } else if ("status".equals(line)) {
+ print.println("requests: " + requests);
+ print.println("quiescent: " + isQuiescent);
+ print.print("> ");
+ print.flush();
+ } else {
+ print.println("Unknown command, valid commands are: quit, quiesce, status, resume, shutdown");
+ print.print("> ");
+ print.flush();
+ }
+ }
+ } catch (final IOException e) {
+ LOG.error("networking failure", e);
+ } catch (final RuntimeException e) {
+ LOG.error("request failure", e);
+ } finally {
+ try {
+ connection.close();
+ } catch (final IOException e) {
+ LOG.warn("failure to close connection", e);
+ }
+ }
+ }
+
+ private void startRecovery(final List<String> list) {
+ LOG.info("starting recovery");
+ final LogRange logFileRange = Util.logFileRange();
+ if (logFileRange.noLogFile()) {
+ System.err.println("No recovery files found");
+ System.exit(0);
+ }
+ final long lastId = logFileRange.getLast();
+ LOG.info("last log file is " + Util.logFile(lastId).getName());
+
+ long startId = lastId;
+ long endId = lastId;
+
+ final int size = list.size();
+ if (size > 0) {
+ startId = Long.valueOf(list.get(0));
+ if (size > 1) {
+ endId = Long.valueOf(list.get(1));
+ }
+ }
+ if (startId < logFileRange.getFirst() || startId > lastId || endId > lastId) {
+ System.err.println("File IDs invalid: they must be between " + logFileRange.getFirst() + " and " + lastId);
+ System.exit(0);
+ }
+ if (startId > endId) {
+ System.err.println("File IDs invalid: start must be before the end");
+ System.exit(0);
+ }
+
+ Util.ensureDirectoryExists();
+ for (long id = startId; id <= endId; id++) {
+ final File file = Util.logFile(id);
+ LOG.info("recovering data from " + file.getName());
+ recover(file);
+ }
+ LOG.info("recovery complete");
+ }
+
+ private void startArchive(final List<String> list) {
+ LOG.info("starting archiving");
+ final LogRange logFileRange = Util.logFileRange();
+ if (logFileRange.noLogFile()) {
+ System.err.println("No recovery files found");
+ System.exit(0);
+ }
+ final long lastId = logFileRange.getLast();
+ LOG.info("last log file is " + Util.logFile(lastId).getName());
+
+ long endId = lastId - 1;
+
+ final int size = list.size();
+ if (size > 0) {
+ endId = Long.valueOf((String) list.get(0));
+ }
+ if (endId >= lastId) {
+ System.err.println("File ID invalid: they must be less that " + lastId);
+ System.exit(0);
+ }
+ final long startId = logFileRange.getFirst();
+ for (long id = startId; id <= endId; id++) {
+ final File file = Util.logFile(id);
+ LOG.info("moving " + file.getName());
+ final File destination = Util.archiveLogFile(id);
+ file.renameTo(destination);
+ }
+ LOG.info("archive complete");
+
+ }
+
+ private void startSecondary() {
+ final String serviceHost = config.getString("fileserver.sync-host", DEFAULT_HOST);
+ final int servicePort = config.getInt("fileserver.sync-port", DEFAULT_SYNC_PORT);
+
+ Util.ensureDirectoryExists();
+ ServerSocket socket = null;
+ try {
+ LOG.debug("setting up syncing socket on " + serviceHost + ":" + servicePort);
+ final InetAddress address = InetAddress.getByName(serviceHost);
+ socket = new ServerSocket(servicePort, 0, address);
+ LOG.info("listenting on " + socket.getInetAddress().getHostAddress() + " port " + socket.getLocalPort());
+ LOG.debug("listenting on " + socket);
+ do {
+ syncConnection(socket.accept(), 0);
+ } while (awaitConnections);
+ } catch (final UnknownHostException e) {
+ LOG.error("Unknown host " + serviceHost, e);
+ System.exit(0);
+ } catch (final IOException e) {
+ LOG.error("start failure - networking not set up for " + serviceHost, e);
+ System.exit(0);
+ } catch (final RuntimeException e) {
+ LOG.error("start failure", e);
+ System.exit(0);
+ }
+ }
+
+ private void syncConnection(final Socket connection, final int readTimeout) {
+ try {
+ final CRC32 crc32 = new CRC32();
+ final DataOutput output = new DataOutputStream(connection.getOutputStream());
+ final DataInput input = new DataInputStream(new CheckedInputStream(connection.getInputStream(), crc32));
+
+ if (input.readByte() != INIT) {
+ return;
+ }
+
+ final LogRange logFileRange = Util.logFileRange();
+ final long lastId = logFileRange.noLogFile() ? -1 : logFileRange.getLast();
+ output.writeLong(lastId);
+ do {
+ if (input.readByte() != RECOVERY_LOG) {
+ return;
+ }
+ crc32.reset();
+ final long logId = input.readLong();
+ final File file = Util.tmpLogFile(logId);
+ LOG.info("syncing recovery file: " + file.getName());
+ final BufferedOutputStream fileOutput = new BufferedOutputStream(new FileOutputStream(file));
+
+ final byte[] buffer = new byte[8092];
+ int length;
+ while ((length = input.readInt()) > 0) {
+ input.readFully(buffer, 0, length);
+ fileOutput.write(buffer, 0, length);
+ }
+ fileOutput.close();
+
+ final long calculatedChecksum = crc32.getValue();
+ final long sentChecksum = input.readLong();
+ if (calculatedChecksum != sentChecksum) {
+ throw new NoSqlStoreException("Checksum didn't match during download of " + file.getName());
+ }
+
+ recover(file);
+ final File renameTo = Util.logFile(logId);
+ file.renameTo(renameTo);
+ } while (true);
+ } catch (final NoSqlStoreException e) {
+ LOG.error("file server failure", e);
+ } catch (final IOException e) {
+ LOG.error("networking failure", e);
+ } catch (final RuntimeException e) {
+ LOG.error("request failure", e);
+ } finally {
+ try {
+ connection.close();
+ } catch (final IOException e) {
+ LOG.warn("failure to close connection", e);
+ }
+ }
+
+ // TODO restart
+ }
+
+ private void recover(final File file) {
+ LineNumberReader reader = null;
+ try {
+ reader = new LineNumberReader(new InputStreamReader(new FileInputStream(file), Util.ENCODING));
+
+ while (true) {
+ final String line = reader.readLine();
+ if (line == null) {
+ break;
+ }
+ if (!line.startsWith("#transaction started")) {
+ throw new NoSqlStoreException("No transaction start found: " + line + " (" + reader.getLineNumber() + ")");
+ }
+ readTransaction(reader);
+ }
+ } catch (final IOException e) {
+ throw new NoSqlStoreException(e);
+ } finally {
+ if (reader != null) {
+ try {
+ reader.close();
+ } catch (final IOException e) {
+ throw new NoSqlStoreException(e);
+ }
+ }
+ }
+ }
+
+ private void readTransaction(final LineNumberReader reader) throws IOException {
+ final ArrayList<FileContent> files = new ArrayList<FileContent>();
+ final DataFileWriter content = new DataFileWriter(files);
+ String header;
+ while ((header = reader.readLine()) != null) {
+ if (header.startsWith("#transaction ended")) {
+ LOG.debug("transaction read in (ending " + reader.getLineNumber() + ")");
+ content.writeData();
+ reader.readLine();
+ return;
+ }
+ if (header.startsWith("S")) {
+ final String[] split = header.substring(1).split(" ");
+ final String key = split[0];
+ final String name = split[1];
+ server.saveService(key, name);
+ reader.readLine();
+ } else if (header.startsWith("B")) {
+ final String[] split = header.substring(1).split(" ");
+ final String name = split[0];
+ final long nextBatch = Long.valueOf(split[1]);
+ server.saveNextBatch(name, nextBatch);
+ reader.readLine();
+ } else {
+ FileContent elementData;
+ elementData = readElementData(header, reader);
+ files.add(elementData);
+ }
+ }
+ LOG.warn("transaction has no ending marker so is incomplete and will not be restored (ending " + reader.getLineNumber() + ")");
+ }
+
+ private FileContent readElementData(final String header, final LineNumberReader reader) throws IOException {
+ final StringBuffer content = new StringBuffer();
+ String line;
+ while ((line = reader.readLine()) != null) {
+ if (line.length() == 0) {
+ break;
+ }
+ content.append(line);
+ content.append('\n');
+ }
+
+ final char command = header.charAt(0);
+ final String[] split = header.substring(1).split(" ");
+ final String type = split[0];
+ final String id = split[1];
+ final String version = split[2];
+ return new FileContent(command, id, null, version, type, content.toString());
+ }
+
+ private void startLogRolling() {
+ final int rollPeriod = config.getInt("fileserver.log-period", 5);
+ final long sleepTime = rollPeriod * 60 * 1000;
+
+ while (awaitConnections) {
+ final LogWriter logger = server.getLogger();
+ if (logger != null) {
+ logger.startNewFile();
+ }
+ try {
+ Thread.sleep(sleepTime);
+ } catch (final InterruptedException ignore) {
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/isis/blob/680f0c8d/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/FileServerException.java
----------------------------------------------------------------------
diff --git a/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/FileServerException.java b/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/FileServerException.java
new file mode 100644
index 0000000..1f103e5
--- /dev/null
+++ b/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/FileServerException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.isis.objectstore.nosql.db.file.server;
+
+import java.io.IOException;
+
+import org.apache.isis.core.commons.exceptions.IsisException;
+
+public class FileServerException extends IsisException {
+
+ private static final long serialVersionUID = 1L;
+
+ public FileServerException(final String message, final IOException e) {
+ super(message, e);
+ }
+
+ public FileServerException(final String message) {
+ super(message);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/isis/blob/680f0c8d/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/FileServerProcessor.java
----------------------------------------------------------------------
diff --git a/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/FileServerProcessor.java b/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/FileServerProcessor.java
new file mode 100644
index 0000000..09fd8ff
--- /dev/null
+++ b/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/FileServerProcessor.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.isis.objectstore.nosql.db.file.server;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.isis.objectstore.nosql.NoSqlStoreException;
+
+public class FileServerProcessor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FileServerProcessor.class);
+
+ private boolean acceptNewRequests = true;
+ private LockManager locks;
+ private LogWriter logger;
+
+ public void startup() {
+ Util.ensureDirectoryExists();
+ logger = new LogWriter();
+ logger.startup();
+ locks = new LockManager();
+ }
+
+ public void shutdown() {
+ acceptNewRequests = false;
+ locks.waitUntilAllRealeased();
+ logger.shutdown();
+ }
+
+ LogWriter getLogger() {
+ return logger;
+ }
+
+ public void process(final ServerConnection connection) {
+ try {
+ if (acceptNewRequests) {
+ connection.readCommand();
+ final char command = connection.getCommand();
+ switch (command) {
+ case 'L':
+ list(connection);
+ break;
+
+ case 'R':
+ read(connection);
+ break;
+
+ case 'W':
+ write(connection);
+ break;
+
+ case 'I':
+ hasInstances(connection);
+ break;
+
+ case 'S':
+ service(connection);
+ break;
+
+ case 'T':
+ saveService(connection);
+ break;
+
+ case 'N':
+ nextSerialBatch(connection);
+ break;
+
+ case 'X':
+ status(connection);
+ break;
+
+ default:
+ LOG.warn("Unrecognised command " + command);
+ connection.error("Unrecognised command " + command);
+ }
+ } else {
+ connection.abort();
+ }
+ } catch (final Exception e) {
+ LOG.error("Request failed", e);
+ connection.error("Remote exception thrown:\n" + e.getMessage(), e);
+
+ } finally {
+ connection.close();
+ }
+ }
+
+ private void list(final ServerConnection connection) {
+ try {
+ connection.endCommand();
+ final String type = connection.getRequest();
+ int limit = connection.getRequestAsInt();
+ if (limit == 0) {
+ limit = Integer.MAX_VALUE;
+ }
+
+ final File[] listFiles = listFiles(type);
+ if (listFiles != null) {
+ connection.ok();
+ for (final File file : listFiles) {
+ final String fileName = file.getName();
+ final String id = fileName.substring(0, fileName.length() - 5);
+ final DataFileReader reader = findInstance(type, id, connection);
+ readInstance(reader, connection);
+ locks.release(id, getTransactionId());
+ if (limit-- < 0) {
+ break;
+ }
+ }
+ connection.endBlock();
+ } else {
+ connection.response("");
+ }
+
+ } catch (final IOException e) {
+ throw new NoSqlStoreException(Util.READ_ERROR, e);
+ }
+ }
+
+ private File[] listFiles(final String type) {
+ final File[] listFiles = Util.directory(type).listFiles(new FileFilter() {
+ @Override
+ public boolean accept(final File pathname) {
+ return pathname.getName().endsWith(".data");
+ }
+ });
+ return listFiles;
+ }
+
+ private void read(final ServerConnection connection) {
+ String type = null;
+ String id = null;
+ try {
+ connection.endCommand();
+ type = connection.getRequest();
+ id = connection.getRequest();
+ final DataFileReader reader = findInstance(type, id, connection);
+ if (reader == null) {
+ connection.notFound(Util.FILE_NOT_FOUND + " for " + type + "/" + id);
+ } else {
+ connection.ok();
+ readInstance(reader, connection);
+ }
+ } catch (final IOException e) {
+ throw new NoSqlStoreException(Util.READ_ERROR + " for " + type + "/" + id, e);
+ } finally {
+ locks.release(id, getTransactionId());
+ }
+
+ }
+
+ private DataFileReader findInstance(final String type, final String id, final ServerConnection connection) throws IOException {
+ LOG.debug("reading file " + id);
+ locks.acquireRead(id, getTransactionId());
+ try {
+ return new DataFileReader(type, id);
+ } catch (final FileNotFoundException e) {
+ LOG.error(Util.FILE_NOT_FOUND + " for " + type + "/" + id, e);
+ return null;
+ }
+ }
+
+ private void readInstance(final DataFileReader reader, final ServerConnection connection) {
+ final String data = reader.getData();
+ reader.close();
+ connection.responseData(data);
+ }
+
+ private void write(final ServerConnection connection) {
+ List<FileContent> files = null;
+ try {
+ files = getWriteRequests(connection);
+ final String error = acquireLocks(files);
+ if (error == null) {
+ logger.logWrites(files);
+ final DataFileWriter content = new DataFileWriter(files);
+ content.writeData();
+ connection.ok();
+ } else {
+ connection.error(error);
+ }
+
+ } catch (final IOException e) {
+ throw new NoSqlStoreException("Failed to write data", e);
+ } finally {
+ if (files != null) {
+ releaseLocks(files);
+ }
+ }
+ }
+
+ private List<FileContent> getWriteRequests(final ServerConnection connection) throws IOException {
+ final ArrayList<FileContent> files = new ArrayList<FileContent>();
+ while (connection.readWriteHeaders()) {
+ final char command = connection.getCommand();
+ final String type = connection.getRequest();
+ final String id = connection.getRequest();
+ final String currentVersion = connection.getRequest();
+ final String newVersion = connection.getRequest();
+ LOG.debug("write for " + type + "@" + id + " v." + newVersion);
+
+ final String buf = connection.getData();
+ files.add(new FileContent(command, id, currentVersion, newVersion, type, buf));
+ }
+ // connection.endCommand();
+ return files;
+ }
+
+ private String acquireLocks(final List<FileContent> list) throws IOException {
+ final Thread transactionId = getTransactionId();
+ for (final FileContent item : list) {
+ if (!locks.acquireWrite(item.id, transactionId)) {
+ return item.type + " being changed by another user, please try again\n" + item.data;
+ }
+ if (Util.shouldFileExist(item.command)) {
+ final DataFileReader dataReader = new DataFileReader(item.type, item.id);
+ final String version = dataReader.getVersion();
+ if (!version.equals(item.currentVersion)) {
+ // String data = dataReader.getData();
+ dataReader.close();
+ return "mismatch between FileContent version (" + item.currentVersion + ") and DataReader version (" + version + ")";
+ }
+ dataReader.close();
+ }
+ }
+ return null;
+ }
+
+ private void releaseLocks(final List<FileContent> list) {
+ final Thread transactionId = getTransactionId();
+ for (final FileContent item : list) {
+ locks.release(item.id, transactionId);
+ }
+ }
+
+ private Thread getTransactionId() {
+ return Thread.currentThread();
+ }
+
+ private void status(final ServerConnection connection) throws IOException {
+ connection.endCommand();
+ final String request = connection.getRequest();
+ if (request.equals("contains-data")) {
+ connection.response(Util.isPopulated());
+
+ } else {
+ connection.error("Unrecognised command " + request);
+ }
+ }
+
+ private void service(final ServerConnection connection) {
+ connection.endCommand();
+ final String name = connection.getRequest();
+ final File file = Util.serviceFile(name);
+ if (file.exists()) {
+ final String id = readServiceFile(file);
+ connection.response(id);
+ } else {
+ connection.response("null");
+ }
+ }
+
+ private String readServiceFile(final File file) {
+ BufferedReader reader = null;
+ try {
+ reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), Util.ENCODING));
+ final String[] split = reader.readLine().split(" ");
+ return split[1];
+ } catch (final IOException e) {
+ LOG.error("failed to read service file", e);
+ throw new FileServerException("Failed to read service file", e);
+ } finally {
+ try {
+ reader.close();
+ } catch (final IOException e) {
+ LOG.error("failed to close file", e);
+ }
+ }
+ }
+
+ private void saveService(final ServerConnection connection) throws IOException {
+ connection.endCommand();
+ final String name = connection.getRequest();
+ final String key = connection.getRequest();
+ logger.logServiceEntry(key, name);
+ saveService(key, name);
+ connection.ok();
+ }
+
+ void saveService(final String key, final String name) throws FileNotFoundException, IOException, UnsupportedEncodingException {
+ FileOutputStream fileOut = null;
+ final File file = Util.serviceFile(name);
+ try {
+ fileOut = new FileOutputStream(file);
+ fileOut.write(name.getBytes(Util.ENCODING));
+ fileOut.write(' ');
+ fileOut.write(key.getBytes(Util.ENCODING));
+ } finally {
+ if (fileOut != null) {
+ try {
+ fileOut.close();
+ } catch (final IOException e) {
+ throw new NoSqlStoreException(e);
+ }
+ }
+ }
+ }
+
+ private void nextSerialBatch(final ServerConnection connection) throws IOException {
+ // TODO lock file first
+
+ connection.endCommand();
+ final String name = connection.getRequest();
+ final int batchSize = connection.getRequestAsInt();
+
+ long nextId;
+ final File file = Util.serialNumberFile(name);
+ if (!file.exists()) {
+ nextId = 1;
+ LOG.info("Initial ID batch created at " + nextId);
+ } else {
+ final BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), Util.ENCODING));
+ nextId = Long.valueOf(reader.readLine()).longValue();
+ reader.close();
+ LOG.info("New ID batch allocated, from " + nextId);
+ }
+
+ final long newBatchAt = nextId + batchSize;
+ logger.logNextSerialBatch(name, newBatchAt);
+
+ saveNextBatch(file, newBatchAt);
+
+ // TODO remove lock
+
+ connection.response(nextId);
+ }
+
+ private void saveNextBatch(final File file, final long newBatchAt) throws FileNotFoundException, IOException {
+ final FileOutputStream fileOutput = new FileOutputStream(file);
+ fileOutput.write(Long.toString(newBatchAt).getBytes(Util.ENCODING));
+ fileOutput.close();
+ }
+
+ public void saveNextBatch(final String name, final long nextBatch) throws IOException {
+ saveNextBatch(Util.serialNumberFile(name), nextBatch);
+ }
+
+ private void hasInstances(final ServerConnection connection) throws IOException {
+ connection.endCommand();
+ final String type = connection.getRequest();
+ final File[] listFiles = listFiles(type);
+ final boolean hasInstances = listFiles != null && listFiles.length > 0;
+ connection.response(hasInstances);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/isis/blob/680f0c8d/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/Lock.java
----------------------------------------------------------------------
diff --git a/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/Lock.java b/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/Lock.java
new file mode 100644
index 0000000..22818db
--- /dev/null
+++ b/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/Lock.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.isis.objectstore.nosql.db.file.server;
+
+import java.util.ArrayList;
+import java.util.List;
+
+class Lock {
+
+ private Thread write;
+ private final List<Thread> reads = new ArrayList<Thread>();
+
+ public boolean isWriteLocked() {
+ return write != null;
+ }
+
+ public void addRead(final Thread transaction) {
+ reads.add(transaction);
+ }
+
+ public void setWrite(final Thread transaction) {
+ write = transaction;
+ }
+
+ public void remove(final Thread transaction) {
+ if (write == transaction) {
+ write = null;
+ } else {
+ reads.remove(transaction);
+ }
+ }
+
+ public boolean isEmpty() {
+ return write == null && reads.isEmpty();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/isis/blob/680f0c8d/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/LockManager.java
----------------------------------------------------------------------
diff --git a/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/LockManager.java b/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/LockManager.java
new file mode 100644
index 0000000..a014dfa
--- /dev/null
+++ b/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/LockManager.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.isis.objectstore.nosql.db.file.server;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class LockManager {
+
+ private final Map<String, Lock> locks = new HashMap<String, Lock>();
+
+ public synchronized void acquireRead(final String id, final Thread transaction) {
+ final Lock lock = getLock(id);
+ lock.addRead(transaction);
+ }
+
+ public boolean acquireWrite(final String id, final Thread transaction) {
+ final Lock lock = getLock(id);
+ if (lock.isWriteLocked()) {
+ return false;
+ }
+ lock.setWrite(transaction);
+ return true;
+ }
+
+ private Lock getLock(final String id) {
+ Lock lock;
+ synchronized (this) {
+ lock = locks.get(id);
+ if (lock == null) {
+ lock = new Lock();
+ locks.put(id, lock);
+ }
+ }
+ return lock;
+ }
+
+ public synchronized void release(final String id, final Thread transaction) {
+ final Lock lock = getLock(id);
+ lock.remove(transaction);
+ if (lock.isEmpty()) {
+ locks.remove(id);
+ }
+ }
+
+ public void waitUntilAllRealeased() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/isis/blob/680f0c8d/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/LogRange.java
----------------------------------------------------------------------
diff --git a/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/LogRange.java b/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/LogRange.java
new file mode 100644
index 0000000..594d7e8
--- /dev/null
+++ b/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/LogRange.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.isis.objectstore.nosql.db.file.server;
+
+class LogRange {
+ private long first = Long.MAX_VALUE;
+ private long last = 0;
+
+ public long getFirst() {
+ return first;
+ }
+
+ public long getLast() {
+ return last;
+ }
+
+ public boolean noLogFile() {
+ return first == Long.MAX_VALUE && last == 0;
+ }
+
+ public void add(final long sequence) {
+ last = Math.max(last, sequence);
+ first = Math.min(first, sequence);
+ }
+}
http://git-wip-us.apache.org/repos/asf/isis/blob/680f0c8d/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/LogWriter.java
----------------------------------------------------------------------
diff --git a/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/LogWriter.java b/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/LogWriter.java
new file mode 100644
index 0000000..10de29f
--- /dev/null
+++ b/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/LogWriter.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.isis.objectstore.nosql.db.file.server;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.isis.objectstore.nosql.NoSqlStoreException;
+
+public class LogWriter {
+
+ private DataOutputStream writer;
+ private boolean startNewFile = false;
+ private long nextLogIdToWrite;
+
+ public void startNewFile() {
+ // don't start new file if old one is empty
+ final File file = Util.logFile(nextLogIdToWrite);
+ if (file.exists() && file.length() > 0) {
+ startNewFile = true;
+ }
+ }
+
+ public synchronized void logNextSerialBatch(final String name, final long newBatchAt) {
+ startNewFileIfNeeded();
+ try {
+ writer.write(("#transaction started - " + new Date().toString() + "\n").getBytes());
+ writer.write('B');
+ writer.write(name.getBytes(Util.ENCODING));
+ writer.write(' ');
+ writer.write(Long.toString(newBatchAt).getBytes(Util.ENCODING));
+ writer.write('\n');
+ writer.write('\n');
+ writer.write("#transaction ended\n\n".getBytes());
+ writer.flush();
+ } catch (final IOException e) {
+ throw new NoSqlStoreException("Failed to write serial number data to log file", e);
+ }
+
+ }
+
+ public synchronized void logServiceEntry(final String key, final String name) {
+ startNewFileIfNeeded();
+ try {
+ writer.write(("#transaction started - " + new Date().toString() + "\n").getBytes());
+ writer.write('S');
+ writer.write(key.getBytes(Util.ENCODING));
+ writer.write(' ');
+ writer.write(name.getBytes(Util.ENCODING));
+ writer.write('\n');
+ writer.write('\n');
+ writer.write("#transaction ended\n\n".getBytes());
+ writer.flush();
+ } catch (final IOException e) {
+ throw new NoSqlStoreException("Failed to write service entry data to log file", e);
+ }
+ }
+
+ public synchronized void logWrites(final List<FileContent> items) {
+ startNewFileIfNeeded();
+ try {
+ writer.write(("#transaction started - " + new Date().toString() + "\n").getBytes());
+ for (final FileContent content : items) {
+ writer.write(content.command);
+ content.write(writer);
+ writer.write('\n');
+ }
+ writer.write("#transaction ended\n\n".getBytes());
+ writer.flush();
+ } catch (final IOException e) {
+ throw new NoSqlStoreException("Failed to write data to log file", e);
+ }
+ }
+
+ private void startNewFileIfNeeded() {
+ if (startNewFile) {
+ close();
+ openNewFile();
+ startNewFile = false;
+ }
+ }
+
+ private void openNewFile() {
+ nextLogIdToWrite++;
+ final File file = Util.logFile(nextLogIdToWrite);
+ if (file.exists()) {
+ throw new NoSqlStoreException("Log file already exists");
+ }
+ openFile(file);
+ }
+
+ private void openFile(final File file) {
+ try {
+ writer = new DataOutputStream(new FileOutputStream(file));
+ startNewFile = false;
+ } catch (final IOException e) {
+ throw new NoSqlStoreException("Failed to open log file", e);
+ }
+ }
+
+ public void startup() {
+ nextLogIdToWrite = Util.logFileRange().getLast();
+ startNewFile();
+ if (!startNewFile) {
+ final File file = Util.logFile(nextLogIdToWrite);
+ openFile(file);
+ } else {
+ openNewFile();
+ }
+ }
+
+ public void shutdown() {
+ close();
+ }
+
+ private void close() {
+ try {
+ writer.close();
+ } catch (final IOException e) {
+ throw new NoSqlStoreException("Falied to close log file", e);
+ }
+ }
+
+ public synchronized boolean isWritten(final long logId) {
+ startNewFileIfNeeded();
+ return logId < nextLogIdToWrite;
+ }
+}
http://git-wip-us.apache.org/repos/asf/isis/blob/680f0c8d/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/ServerConnection.java
----------------------------------------------------------------------
diff --git a/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/ServerConnection.java b/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/ServerConnection.java
new file mode 100644
index 0000000..77c61ac
--- /dev/null
+++ b/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/ServerConnection.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.isis.objectstore.nosql.db.file.server;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.isis.objectstore.nosql.NoSqlStoreException;
+import org.apache.isis.objectstore.nosql.db.file.RemotingException;
+
+public class ServerConnection {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ServerConnection.class);
+
+ private final BufferedReader reader;
+ private final PrintWriter writer;
+ private final OutputStream outputStream;
+ private final InputStream inputStream;
+ private int header;
+ private String[] headers;
+ private char command;
+
+ public ServerConnection(final InputStream input, final OutputStream output) {
+ outputStream = Util.trace(output, true);
+ inputStream = Util.trace(input, true);
+ this.reader = new BufferedReader(new InputStreamReader(inputStream, Util.ENCODING));
+ this.writer = new PrintWriter(new OutputStreamWriter(outputStream, Util.ENCODING));
+ }
+
+ public void readCommand() {
+ readHeaders();
+ }
+
+ private void logFailure() {
+ LOG.error("(failed " + inputStream + ")");
+ LOG.error("(failed " + outputStream + ")");
+ }
+
+ public void logComplete() {
+ LOG.debug("(complete " + inputStream + ")");
+ LOG.debug("(complete " + outputStream + ")");
+ }
+
+ boolean readHeaders() {
+ try {
+ final String line = reader.readLine();
+ LOG.debug("header: " + line);
+ if (line == null) {
+ logFailure();
+ throw new RemotingException("stream ended prematurely while reading header, aborting request");
+ }
+ if (line.length() == 0) {
+ return false;
+ } else {
+ command = line.charAt(0);
+ headers = line.substring(1).split(" ");
+ this.header = 0;
+ return true;
+ }
+ } catch (final IOException e) {
+ logFailure();
+ throw new NoSqlStoreException(e);
+ }
+ }
+
+ public boolean readWriteHeaders() {
+ final boolean readHeaders = readHeaders();
+ if (readHeaders && headers.length != 4) {
+ logFailure();
+ throw new RemotingException("invalid header string, aborting request");
+ }
+ return readHeaders;
+ }
+
+ public String getRequest() {
+ return headers[header++];
+ }
+
+ public int getRequestAsInt() {
+ return Integer.valueOf(getRequest()).intValue();
+ }
+
+ public char getCommand() {
+ return command;
+ }
+
+ public void endCommand() {
+ try {
+ final String line = reader.readLine();
+ if (line == null) {
+ logFailure();
+ throw new RemotingException("stream ended prematurely while reading end of command, aborting request");
+ }
+ if (line.length() > 0) {
+ logFailure();
+ throw new RemotingException("command didn't end with an empty blank line, aborting request");
+ }
+ } catch (final IOException e) {
+ logFailure();
+ throw new NoSqlStoreException(e);
+ }
+ }
+
+ /**
+ * Reads all the data up until the next blank line.
+ */
+ public String getData() {
+ try {
+ final StringBuffer buffer = new StringBuffer();
+ String line;
+ while (true) {
+ line = reader.readLine();
+ if (line == null) {
+ logFailure();
+ throw new RemotingException("stream ended prematurely while reading data, aborting request");
+ }
+ if (line.length() == 0) {
+ break;
+ }
+ buffer.append(line);
+ buffer.append('\n');
+ }
+ return buffer.toString();
+ } catch (final IOException e) {
+ logFailure();
+ throw new RemotingException(e);
+ }
+ }
+
+ /*
+ * public void getTermination() { try { String line = reader.readLine(); if
+ * (line == null || !line.equals("***")) { logFailure(); throw new
+ * RemotingException
+ * ("stream ended abruptly while reading data, aborting request"); } } catch
+ * (IOException e) { logFailure(); throw new RemotingException(e); }
+ *
+ * }
+ */
+ public void notFound(final String message) {
+ writer.print("not-found");
+ writer.print('\n');
+ writer.print(message);
+ writer.print('\n');
+ writer.flush();
+ }
+
+ public void error(final String message) {
+ writer.print("error");
+ writer.print('\n');
+ writer.print(message);
+ writer.print('\n');
+ writer.flush();
+ }
+
+ public void error(final String message, final Exception exception) {
+ error(message);
+ exception.printStackTrace(writer);
+ writer.print('\n');
+ writer.print('\n');
+ writer.flush();
+ }
+
+ private void write(final String result) {
+ writer.print(result);
+ writer.print('\n');
+ writer.flush();
+ }
+
+ public void ok() {
+ response(Util.OK, "");
+ }
+
+ public void abort() {
+ response(Util.ABORT, "");
+ }
+
+ public void response(final boolean flag) {
+ response(Util.OK, " " + (flag ? "true" : "false"));
+ }
+
+ public void response(final long value) {
+ response(Util.OK, " " + Long.toString(value));
+ }
+
+ public void response(final String message) {
+ response(Util.OK, " " + message);
+ }
+
+ private void response(final String status, final String message) {
+ final String response = status + message;
+ LOG.debug("response: " + response);
+ write(response);
+ }
+
+ public void responseData(final String data) {
+ write(data);
+ }
+
+ public void close() {
+ try {
+ reader.close();
+ writer.close();
+ } catch (final IOException e) {
+ logFailure();
+ throw new RemotingException(e);
+ }
+ }
+
+ public void endBlock() {
+ writer.print('\n');
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/isis/blob/680f0c8d/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/Util.java
----------------------------------------------------------------------
diff --git a/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/Util.java b/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/Util.java
new file mode 100644
index 0000000..c394997
--- /dev/null
+++ b/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/file/server/Util.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.isis.objectstore.nosql.db.file.server;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.isis.objectstore.nosql.NoSqlStoreException;
+
+public class Util {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Util.class);
+
+ private static final String DEFAULT_DIRECTORY = "data";
+ private static final String SERVICES_DIRECTORY = "services";
+ private static final String LOGS_DIRECTORY = "logs";
+ private static final String LOGS_ARCHIVE_DIRECTORY = "archive";
+ public static final String ABORT = "abort";
+ public static final String OK = "ok";
+ public static final String READ_ERROR = "Read error";
+ public static final String FILE_NOT_FOUND = "File not found";
+ private static final int NEWLINE = '\n';
+
+ private static File dataDirectory = new File(DEFAULT_DIRECTORY);
+ private static File serviceDirectory = new File(DEFAULT_DIRECTORY, SERVICES_DIRECTORY);
+ private static File logDirectory = new File(DEFAULT_DIRECTORY, LOGS_DIRECTORY);
+ private static File logArchiveDirectory = new File(DEFAULT_DIRECTORY, LOGS_ARCHIVE_DIRECTORY);
+
+ static void setDirectory(final String data, final String services, final String logs, final String archive) {
+ final String directory = data == null ? DEFAULT_DIRECTORY : data;
+ Util.dataDirectory = new File(directory);
+ Util.serviceDirectory = new File(directory, services == null ? SERVICES_DIRECTORY : services);
+ Util.logDirectory = new File(directory, logs == null ? LOGS_DIRECTORY : logs);
+ Util.logArchiveDirectory = new File(directory, archive == null ? LOGS_ARCHIVE_DIRECTORY : archive);
+ }
+
+ static void ensureDirectoryExists() {
+ if (!serviceDirectory.exists()) {
+ serviceDirectory.mkdirs();
+ }
+ if (!logDirectory.exists()) {
+ logDirectory.mkdirs();
+ }
+ if (!logArchiveDirectory.exists()) {
+ logArchiveDirectory.mkdirs();
+ }
+ }
+
+ public static boolean isPopulated() {
+ final FileFilter filter = new FileFilter() {
+ @Override
+ public boolean accept(final File pathname) {
+ final String name = pathname.getName();
+ return name.endsWith(".data") || name.endsWith(".id") || name.endsWith(".log");
+ }
+ };
+
+ final File[] data = dataDirectory.listFiles();
+ for (final File directory : data) {
+ if (directory.isDirectory() && directory.listFiles(filter).length > 1) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static boolean hasInstances(final String type) {
+ final FileFilter filter = new FileFilter() {
+ @Override
+ public boolean accept(final File pathname) {
+ final String name = pathname.getName();
+ return name.endsWith(".data") || name.endsWith(".id") || name.endsWith(".log");
+ }
+ };
+
+ final File[] data = dataDirectory.listFiles();
+ for (final File directory : data) {
+ if (directory.isDirectory() && directory.listFiles(filter).length > 1) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static String xxreadNext(final InputStream input) throws IOException {
+ final byte[] buffer = new byte[100];
+ int c;
+ int i = 0;
+ // TODO deal with buffer overrun
+ while ((c = input.read()) != ' ' && c != '\n' && c != -1) {
+ buffer[i++] = (byte) c;
+ }
+ if (i == 0) {
+ throw new NoSqlStoreException("No data read from " + input);
+ }
+ final String read = new String(buffer, 0, i);
+ LOG.debug("read " + read);
+ return read;
+ }
+
+ public static byte[] xxreadData(final InputStream input) throws IOException {
+ // TODO increase to suitable size
+ byte[] buf = new byte[32];
+ int i = 0;
+ int c1 = input.read();
+ if (c1 == '.') {
+ return null;
+ }
+ int c2 = input.read();
+ boolean isEnd;
+ do {
+ if (i == buf.length) {
+ final byte[] newBuf = new byte[buf.length * 2];
+ System.arraycopy(buf, 0, newBuf, 0, buf.length);
+ buf = newBuf;
+ }
+ buf[i++] = (byte) c1;
+ c1 = c2;
+ c2 = input.read();
+ isEnd = (c1 == NEWLINE && c2 == NEWLINE) || c2 == -1;
+ } while (!isEnd);
+ return buf;
+ }
+
+ static File directory(final String type) {
+ return new File(dataDirectory, type);
+ }
+
+ static File dataFile(final String type, final String id) {
+ final File dir = directory(type);
+ return new File(dir, id + ".data");
+ }
+
+ public static File serviceFile(final String name) {
+ return new File(serviceDirectory, name + ".id");
+ }
+
+ public static File serialNumberFile(final String name) {
+ return new File(dataDirectory, "serialnumbers" + name.trim() + ".data");
+ }
+
+ static File logFile(final long id) {
+ return new File(logDirectory, "recovery" + id + ".log");
+ }
+
+ static File tmpLogFile(final long id) {
+ return new File(logDirectory, "recovery" + id + ".log.tmp");
+ }
+
+ public static File archiveLogFile(final long id) {
+ return new File(logArchiveDirectory, "recovery" + id + ".log");
+ }
+
+ static LogRange logFileRange() {
+ final LogRange logRange = new LogRange();
+ final File[] listFiles = logDirectory.listFiles();
+ if (listFiles != null) {
+ for (final File file : listFiles) {
+ final String name = file.getName();
+ final String substring = name.substring(8, name.length() - 4);
+ try {
+ final long sequence = Long.parseLong(substring);
+ logRange.add(sequence);
+ } catch (final NumberFormatException ignore) {
+ }
+ }
+ }
+ return logRange;
+ }
+
+ static final char DELETE = 'D';
+
+ public static final Charset ENCODING = Charset.forName("utf-8");
+
+ public static boolean isDelete(final char command) {
+ return command == Util.DELETE;
+ }
+
+ public static boolean isSave(final char command) {
+ return command != Util.DELETE;
+ }
+
+ public static boolean shouldFileExist(final char command) {
+ return command == 'D' || command == 'U';
+ }
+
+ public static InputStream trace(final InputStream inputStream, final boolean isOn) {
+ return !isOn ? inputStream : new InputStream() {
+ StringBuffer log = new StringBuffer();
+
+ @Override
+ public int read() throws IOException {
+ final int b = inputStream.read();
+ log(b);
+ return b;
+ }
+
+ private void log(final int b) {
+ log.append(b < 32 ? ("<" + b + ">" + (char) b) : (char) b);
+ // System.out.print(b < 32 ? ("<" + b + ">" + (char) b) : (char)
+ // b);
+ }
+
+ @Override
+ public int read(final byte[] b) throws IOException {
+ final int read = inputStream.read(b);
+ for (int i = 0; i < read; i++) {
+ log(b[i]);
+ }
+ return read;
+ }
+
+ @Override
+ public int read(final byte[] b, final int off, final int len) throws IOException {
+ final int read = inputStream.read(b, off, len);
+ for (int i = 0; i < read; i++) {
+ log(b[off + i]);
+ }
+ return read;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return inputStream.available();
+ }
+
+ @Override
+ public long skip(final long n) throws IOException {
+ return inputStream.skip(n);
+ }
+
+ @Override
+ public void close() throws IOException {
+ // LOG.debug("in - " + log.toString());
+ inputStream.close();
+ }
+
+ @Override
+ public String toString() {
+ return "in#" + Long.toHexString(hashCode()) + " " + log;
+ }
+ };
+ }
+
+ public static OutputStream trace(final OutputStream outputStream, final boolean isOn) {
+ return !isOn ? outputStream : new OutputStream() {
+ StringBuffer log = new StringBuffer();
+
+ @Override
+ public void write(final int b) throws IOException {
+ log(b);
+ outputStream.write(b);
+ }
+
+ private void log(final int b) {
+ log.append(b < 32 ? ("<" + b + ">" + (char) b) : (char) b);
+ // System.out.print(b < 32 ? ("<" + b + ">" + (char) b) : (char)
+ // b);
+ }
+
+ @Override
+ public void write(final byte[] b) throws IOException {
+ for (final byte element : b) {
+ log(element);
+ }
+ outputStream.write(b);
+ }
+
+ @Override
+ public void write(final byte[] b, final int off, final int len) throws IOException {
+ for (int i = 0; i < len; i++) {
+ log(b[off + i]);
+ }
+ outputStream.write(b, off, len);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ outputStream.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ // LOG.debug("out - " + log.toString());
+ outputStream.close();
+ }
+
+ @Override
+ public String toString() {
+ return "out#" + Long.toHexString(hashCode()) + " " + log;
+ }
+ };
+ }
+
+ public static void closeSafely(final FileOutputStream output) {
+ if (output != null) {
+ try {
+ output.flush();
+ output.close();
+ } catch (final IOException e) {
+ // throw new ObjectAdapterRuntimeException(e);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/isis/blob/680f0c8d/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/mongo/MongoClientCommandContext.java
----------------------------------------------------------------------
diff --git a/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/mongo/MongoClientCommandContext.java b/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/mongo/MongoClientCommandContext.java
new file mode 100644
index 0000000..2745967
--- /dev/null
+++ b/mothballed/component/objectstore/nosql/src/main/java/org/apache/isis/objectstore/nosql/db/mongo/MongoClientCommandContext.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.isis.objectstore.nosql.db.mongo;
+
+import com.mongodb.DB;
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.isis.core.metamodel.adapter.oid.Oid;
+import org.apache.isis.core.metamodel.adapter.version.ConcurrencyException;
+import org.apache.isis.core.metamodel.spec.ObjectSpecId;
+import org.apache.isis.objectstore.nosql.NoSqlCommandContext;
+import org.apache.isis.objectstore.nosql.db.StateWriter;
+
+
+public class MongoClientCommandContext implements NoSqlCommandContext {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MongoClientCommandContext.class);
+ private final DB db;
+
+ public MongoClientCommandContext(DB db) {
+ this.db = db;
+ }
+
+ @Override
+ public void start() {}
+
+ @Override
+ public void end() {}
+
+ @Override
+ public StateWriter createStateWriter(final ObjectSpecId objectSpecId) {
+ return new MongoStateWriter(db, objectSpecId);
+ }
+
+ @Override
+ public void delete(final ObjectSpecId objectSpecId, final String mongoId, final String version, final Oid oid) {
+ final DBCollection instances = db.getCollection(objectSpecId.asString());
+ final DBObject object = instances.findOne(mongoId);
+ if (!object.get(PropertyNames.VERSION).equals(version)) {
+ throw new ConcurrencyException("Could not delete object of different version", oid);
+ }
+ instances.remove(object);
+ LOG.info("removed " + oid);
+ }
+
+ @Override
+ public void insert(final StateWriter writer) {
+ ((MongoStateWriter) writer).flush();
+ }
+
+ @Override
+ public void update(final StateWriter writer) {
+ ((MongoStateWriter) writer).flush();
+ }
+
+}