You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@isis.apache.org by da...@apache.org on 2012/12/06 17:59:48 UTC
[42/52] [partial] ISIS-188: more reorganizing of artifacts into
physical directories.
http://git-wip-us.apache.org/repos/asf/isis/blob/0861ed93/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/ClientConnection.java
----------------------------------------------------------------------
diff --git a/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/ClientConnection.java b/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/ClientConnection.java
new file mode 100644
index 0000000..bfa5829
--- /dev/null
+++ b/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/ClientConnection.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.isis.runtimes.dflt.objectstores.nosql.db.file;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+
+import org.apache.log4j.Logger;
+
+import org.apache.isis.core.commons.exceptions.IsisException;
+import org.apache.isis.core.metamodel.adapter.version.ConcurrencyException;
+import org.apache.isis.runtimes.dflt.objectstores.nosql.NoSqlStoreException;
+import org.apache.isis.runtimes.dflt.objectstores.nosql.db.file.server.Util;
+import org.apache.isis.runtimes.dflt.runtime.persistence.ObjectNotFoundException;
+
+public class ClientConnection {
+
+ private static final Logger LOG = Logger.getLogger(ClientConnection.class);
+
+ private final InputStream inputStream;
+ private final OutputStream outputStream;
+ private final PrintWriter writer;
+ private final BufferedReader reader;
+ private String[] headers;
+ private int header;
+
+ public ClientConnection(final InputStream input, final OutputStream output) {
+ outputStream = Util.trace(output, true);
+ writer = new PrintWriter(new OutputStreamWriter(outputStream, Util.ENCODING));
+ inputStream = Util.trace(input, true);
+ reader = new BufferedReader(new InputStreamReader(inputStream, Util.ENCODING));
+ }
+
+ public void close() {
+ try {
+ reader.close();
+ } catch (final IOException e) {
+ LOG.error("Failed to close connection", e);
+ }
+ writer.close();
+ }
+
+ void logComplete() {
+ LOG.debug("request complete: " + outputStream);
+ LOG.debug("response complete: " + inputStream);
+ }
+
+ void logFailure() {
+ LOG.info("request failed: " + outputStream);
+ LOG.info("response failed: " + inputStream);
+ }
+
+ public void request(final char command, final String request) {
+ LOG.debug("request: " + command + request);
+ write(command + request);
+ }
+
+ public void validateRequest() {
+ writer.print('\n');
+ writer.flush();
+ getReponseHeader();
+ final String status = readNext();
+ if (status.equals("error")) {
+ final String message = getResponseData();
+ throw new RemotingException(message);
+ } else if (status.equals("not-found")) {
+ final String message = getResponseData();
+ throw new ObjectNotFoundException(message);
+ } else if (status.equals("concurrency")) {
+ final String data = getResponseData();
+ // TODO create better exceptions (requires way to restore
+ // object/version)
+ if (data.startsWith("{")) {
+ throw new ConcurrencyException(data, null);
+
+ } else {
+ throw new ConcurrencyException(data, null);
+
+ }
+ } else if (!status.equals("ok")) {
+ throw new RemotingException("Invalid status in response: " + status);
+ }
+ }
+
+ public void requestData(final String data) {
+ write(data);
+ }
+
+ public void endRequestSection() {
+ writer.print('\n');
+ }
+
+ private void write(final String req) {
+ writer.print(req);
+ writer.print('\n');
+ }
+
+ public void getReponseHeader() {
+ try {
+ final String response = reader.readLine();
+ LOG.debug("response: " + response);
+ headers = response.split(" ");
+ } catch (final IOException e) {
+ throw new NoSqlStoreException(e);
+ }
+ }
+
+ public String getResponse() {
+ return readNext();
+ }
+
+ public boolean getResponseAsBoolean() {
+ final String response = readNext();
+ return response.equals("true") ? true : false;
+ }
+
+ public long getResponseAsLong() {
+ final String response = readNext();
+ return Long.valueOf(response).longValue();
+ }
+
+ /**
+ * Read all the data until the next blank line
+ */
+ public String getResponseData() {
+ try {
+ final StringBuffer buffer = new StringBuffer();
+ String line;
+ while ((line = reader.readLine()) != null && line.length() > 0) {
+ buffer.append(line);
+ buffer.append('\n');
+ }
+ return buffer.toString();
+ } catch (final Exception e) {
+ logFailure();
+ LOG.error(e);
+ throw new RemotingException(e);
+ }
+ }
+
+ private String readNext() {
+ if (header >= headers.length) {
+ throw new RemotingException("attempting to reader header property (index) " + header + " when there are only " + headers.length);
+ }
+ return headers[header++];
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/isis/blob/0861ed93/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/FileClientCommandContext.java
----------------------------------------------------------------------
diff --git a/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/FileClientCommandContext.java b/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/FileClientCommandContext.java
new file mode 100644
index 0000000..c747456
--- /dev/null
+++ b/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/FileClientCommandContext.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.isis.runtimes.dflt.objectstores.nosql.db.file;
+
+import java.util.zip.CRC32;
+
+import org.apache.isis.core.metamodel.adapter.oid.Oid;
+import org.apache.isis.core.metamodel.spec.ObjectSpecId;
+import org.apache.isis.runtimes.dflt.objectstores.nosql.NoSqlCommandContext;
+import org.apache.isis.runtimes.dflt.objectstores.nosql.db.StateWriter;
+
+class FileClientCommandContext implements NoSqlCommandContext {
+
+ private final ClientConnection connection;
+
+ public FileClientCommandContext(final ClientConnection connection) {
+ this.connection = connection;
+ }
+
+ @Override
+ public void start() {
+ }
+
+ @Override
+ public void end() {
+ }
+
+ @Override
+ public StateWriter createStateWriter(final ObjectSpecId specificationName) {
+ return new JsonStateWriter();
+ }
+
+ @Override
+ public void delete(final ObjectSpecId objectSpecId, final String key, final String version, final Oid oid) {
+ connection.request('D', objectSpecId + " " + key + " " + version + " null");
+ connection.endRequestSection();
+ }
+
+ @Override
+ public void insert(final StateWriter writer) {
+ write('I', (JsonStateWriter) writer);
+ }
+
+ @Override
+ public void update(final StateWriter writer) {
+ write('U', (JsonStateWriter) writer);
+ }
+
+ private void write(final char command, final JsonStateWriter writer) {
+ connection.request(command, writer.getRequest());
+ final String data = writer.getData();
+
+ final CRC32 inputChecksum = new CRC32();
+ inputChecksum.update(data.getBytes());
+ inputChecksum.update('\n');
+ final long checksum = inputChecksum.getValue();
+ final String code = Long.toHexString(checksum);
+
+ connection.requestData("00000000".substring(0, 8 - code.length()) + code + data);
+ connection.endRequestSection();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/isis/blob/0861ed93/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/FileServerDb.java
----------------------------------------------------------------------
diff --git a/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/FileServerDb.java b/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/FileServerDb.java
new file mode 100644
index 0000000..c486907
--- /dev/null
+++ b/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/FileServerDb.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.isis.runtimes.dflt.objectstores.nosql.db.file;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.zip.CRC32;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.isis.core.metamodel.adapter.ObjectAdapter;
+import org.apache.isis.core.metamodel.adapter.version.ConcurrencyException;
+import org.apache.isis.core.metamodel.spec.ObjectSpecId;
+import org.apache.isis.runtimes.dflt.objectstores.nosql.NoSqlCommandContext;
+import org.apache.isis.runtimes.dflt.objectstores.nosql.NoSqlStoreException;
+import org.apache.isis.runtimes.dflt.objectstores.nosql.db.NoSqlDataDatabase;
+import org.apache.isis.runtimes.dflt.objectstores.nosql.db.StateReader;
+import org.apache.isis.runtimes.dflt.runtime.persistence.objectstore.transaction.PersistenceCommand;
+import org.apache.log4j.Logger;
+
+public class FileServerDb implements NoSqlDataDatabase {
+
+ private static final Logger LOG = Logger.getLogger(FileServerDb.class);
+
+ private final String host;
+ private final int port;
+
+ private final int timeout;
+
+ public FileServerDb(final String host, final int port, final int timeout) {
+ this.host = host;
+ this.port = port == 0 ? 9012 : port;
+ this.timeout = timeout;
+ }
+
+ // TODO pool connection and reuse
+ private ClientConnection getConnection() {
+ try {
+ final Socket socket;
+ socket = new Socket(host, port);
+ socket.setSoTimeout(timeout);
+ return new ClientConnection(socket.getInputStream(), socket.getOutputStream());
+ } catch (final UnknownHostException e) {
+ throw new NoSqlStoreException("Unknow host " + host, e);
+ } catch (final IOException e) {
+ throw new NoSqlStoreException("Failed to connect to " + host + ":" + port, e);
+ }
+
+ }
+
+ // TODO pool connection and reuse
+ private void returnConnection(final ClientConnection connection) {
+ connection.logComplete();
+ connection.close();
+ }
+
+ // TODO pool connection and reuse - probably need to replace the connection
+ private void abortConnection(final ClientConnection connection) {
+ connection.logFailure();
+ connection.close();
+ }
+
+ @Override
+ public StateReader getInstance(final String key, final ObjectSpecId objectSpecId) {
+ final ClientConnection connection = getConnection();
+ String data;
+ try {
+ final String request = objectSpecId + " " + key;
+ connection.request('R', request);
+ connection.validateRequest();
+ data = connection.getResponseData();
+ } catch (final RuntimeException e) {
+ LOG.error("aborting getInstance", e);
+ abortConnection(connection);
+ throw e;
+ }
+ data = checkData(data);
+ final JsonStateReader reader = new JsonStateReader(data);
+ returnConnection(connection);
+ return reader;
+ }
+
+ @Override
+ public Iterator<StateReader> instancesOf(final ObjectSpecId objectSpecId) {
+ final ClientConnection connection = getConnection();
+ List<StateReader> instances;
+ try {
+ instances = new ArrayList<StateReader>();
+ connection.request('L', objectSpecId + " 0");
+ connection.validateRequest();
+ String data;
+ while ((data = connection.getResponseData()).length() > 0) {
+ data = checkData(data);
+ final JsonStateReader reader = new JsonStateReader(data);
+ instances.add(reader);
+ }
+ } catch (final RuntimeException e) {
+ LOG.error("aborting instancesOf", e);
+ abortConnection(connection);
+ throw e;
+ }
+ returnConnection(connection);
+ return instances.iterator();
+
+ }
+
+ private String checkData(final String data) {
+ final String objectData = data.substring(8);
+
+ final CRC32 inputChecksum = new CRC32();
+ inputChecksum.update(objectData.getBytes());
+ final long actualChecksum = inputChecksum.getValue();
+
+ final String encodedChecksum = data.substring(0, 8);
+ final long expectedChecksum = Long.valueOf(encodedChecksum, 16);
+
+ if (actualChecksum != expectedChecksum) {
+ throw new NoSqlStoreException("Data integrity error; checksums different");
+ }
+
+ return objectData;
+ }
+
+ @Override
+ public void write(final List<PersistenceCommand> commands) {
+ final ClientConnection connection = getConnection();
+ PersistenceCommand currentCommand = null;
+ try {
+ connection.request('W', "");
+ final NoSqlCommandContext context = new FileClientCommandContext(connection);
+ for (final PersistenceCommand command : commands) {
+ currentCommand = command;
+ command.execute(context);
+ }
+ connection.validateRequest();
+
+ } catch (final ConcurrencyException e) {
+ throw e;
+ } catch (final RuntimeException e) {
+ LOG.error("aborting write, command: " + currentCommand, e);
+ abortConnection(connection);
+ throw e;
+ }
+ returnConnection(connection);
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public void open() {
+ }
+
+ @Override
+ public boolean containsData() {
+ final ClientConnection connection = getConnection();
+ boolean flag;
+ try {
+ connection.request('X', "contains-data");
+ connection.validateRequest();
+ flag = connection.getResponseAsBoolean();
+ } catch (final RuntimeException e) {
+ LOG.error("aborting containsData", e);
+ abortConnection(connection);
+ throw e;
+ }
+ returnConnection(connection);
+ return flag;
+ }
+
+ @Override
+ public long nextSerialNumberBatch(final ObjectSpecId name, final int batchSize) {
+ final ClientConnection connection = getConnection();
+ long serialNumber;
+ try {
+ connection.request('N', name + " " + Integer.toString(batchSize));
+ connection.validateRequest();
+ serialNumber = connection.getResponseAsLong();
+ } catch (final RuntimeException e) {
+ LOG.error("aborting nextSerialNumberBatch", e);
+ abortConnection(connection);
+ throw e;
+ }
+ returnConnection(connection);
+ return serialNumber;
+ }
+
+ @Override
+ public void addService(final ObjectSpecId objectSpecId, final String key) {
+ final ClientConnection connection = getConnection();
+ try {
+ connection.request('T', objectSpecId.asString() + " " + key);
+ connection.validateRequest();
+ } catch (final RuntimeException e) {
+ LOG.error("aborting addService", e);
+ abortConnection(connection);
+ throw e;
+ }
+ returnConnection(connection);
+ }
+
+ @Override
+ public String getService(final ObjectSpecId objectSpecId) {
+ final ClientConnection connection = getConnection();
+ String response;
+ try {
+ connection.request('S', objectSpecId.asString());
+ connection.validateRequest();
+ response = connection.getResponse();
+ } catch (final RuntimeException e) {
+ LOG.error("aborting getServices", e);
+ abortConnection(connection);
+ throw e;
+ }
+ returnConnection(connection);
+ return response.equals("null") ? null : response;
+ }
+
+ @Override
+ public boolean hasInstances(final ObjectSpecId objectSpecId) {
+ final ClientConnection connection = getConnection();
+ boolean hasInstances;
+ try {
+ connection.request('I', objectSpecId.asString());
+ connection.validateRequest();
+ hasInstances = connection.getResponseAsBoolean();
+ } catch (final RuntimeException e) {
+ LOG.error("aborting hasInstances", e);
+ abortConnection(connection);
+ throw e;
+ }
+ returnConnection(connection);
+ return hasInstances;
+ }
+
+ public Iterator<StateReader> instancesOf(ObjectSpecId specId, ObjectAdapter pattern) {
+ // TODO implement
+ throw new NotImplementedException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/isis/blob/0861ed93/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/FileServerPersistorMechanismInstaller.java
----------------------------------------------------------------------
diff --git a/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/FileServerPersistorMechanismInstaller.java b/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/FileServerPersistorMechanismInstaller.java
new file mode 100644
index 0000000..0bbcbb3
--- /dev/null
+++ b/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/FileServerPersistorMechanismInstaller.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.isis.runtimes.dflt.objectstores.nosql.db.file;
+
+import org.apache.isis.core.commons.config.ConfigurationConstants;
+import org.apache.isis.core.commons.config.IsisConfiguration;
+import org.apache.isis.core.metamodel.progmodel.ProgrammingModel;
+import org.apache.isis.core.metamodel.specloader.validator.MetaModelValidator;
+import org.apache.isis.core.metamodel.specloader.validator.MetaModelValidatorComposite;
+import org.apache.isis.runtimes.dflt.objectstores.nosql.db.NoSqlDataDatabase;
+import org.apache.isis.runtimes.dflt.objectstores.nosql.db.NoSqlPersistorMechanismInstaller;
+
+public class FileServerPersistorMechanismInstaller extends NoSqlPersistorMechanismInstaller {
+
+ private static final String ROOT = ConfigurationConstants.ROOT + "nosql.fileserver.";
+ private static final String DB_HOST = ROOT + "host";
+ private static final String DB_PORT = ROOT + "port";
+ private static final String DB_TIMEMOUT = ROOT + "timeout";
+
+ public FileServerPersistorMechanismInstaller() {
+ super("fileserver");
+ }
+
+ @Override
+ protected NoSqlDataDatabase createNoSqlDatabase(final IsisConfiguration configuration) {
+ NoSqlDataDatabase db;
+ final String host = configuration.getString(DB_HOST, "localhost");
+ final int port = configuration.getInteger(DB_PORT, 0);
+ final int timeout = configuration.getInteger(DB_TIMEMOUT, 5000);
+ db = new FileServerDb(host, port, timeout);
+ return db;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/isis/blob/0861ed93/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/JsonStateReader.java
----------------------------------------------------------------------
diff --git a/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/JsonStateReader.java b/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/JsonStateReader.java
new file mode 100644
index 0000000..994c434
--- /dev/null
+++ b/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/JsonStateReader.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.isis.runtimes.dflt.objectstores.nosql.db.file;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import org.apache.isis.runtimes.dflt.objectstores.nosql.NoSqlStoreException;
+import org.apache.isis.runtimes.dflt.objectstores.nosql.db.StateReader;
+
+public class JsonStateReader implements StateReader {
+
+ // private static final Logger LOG = Logger.getLogger(FileStateReader.class);
+
+ private JSONObject instance;
+
+ public JsonStateReader(final String data) {
+ try {
+ final JSONObject instance = new JSONObject(data);
+ this.instance = instance;
+ } catch (final JSONException e) {
+ throw new NoSqlStoreException("failed initialise JSON object for text form: " + data, e);
+ }
+ }
+
+ private JsonStateReader(final JSONObject aggregatedObject) {
+ instance = aggregatedObject;
+ }
+
+ @Override
+ public StateReader readAggregate(final String name) {
+ if (instance.has(name)) {
+ final JSONObject aggregatedObject = instance.optJSONObject(name);
+ if (aggregatedObject == null) {
+ return null;
+ } else {
+ return new JsonStateReader(aggregatedObject);
+ }
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public long readLongField(final String id) {
+ final Object value = instance.opt(id);
+ if (value == null) {
+ return 0;
+ } else {
+ return Long.valueOf((String) value);
+ }
+ }
+
+ @Override
+ public String readField(final String name) {
+ if (instance.has(name)) {
+ final Object value = instance.optString(name);
+ if (value == null) {
+ return null;
+ } else {
+ return (String) value;
+ }
+ } else {
+ return null;
+ }
+ }
+
+// @Override
+// public String readObjectType() {
+// return readRequiredField("_type");
+// }
+//
+// @Override
+// public String readId() {
+// return readRequiredField("_id");
+// }
+
+ @Override
+ public String readOid() {
+ return readRequiredField(PropertyNames.OID);
+ }
+
+ @Override
+ public String readVersion() {
+ return readRequiredField(PropertyNames.VERSION);
+ }
+
+ @Override
+ public String readEncrytionType() {
+ try {
+ String encryptionType;
+ if (instance.has("_encrypt")) {
+ encryptionType = instance.getString("_encrypt");
+ } else {
+ encryptionType = "none";
+ }
+ return encryptionType;
+ } catch (final JSONException e) {
+ throw new NoSqlStoreException("failed to read field _encrypt", e);
+ }
+ }
+
+ @Override
+ public String readUser() {
+ return readRequiredField(PropertyNames.USER);
+ }
+
+ @Override
+ public String readTime() {
+ return readRequiredField(PropertyNames.TIME);
+ }
+
+ private String readRequiredField(final String name) {
+ try {
+ final Object value = instance.get(name);
+ return (String) value;
+ } catch (final JSONException e) {
+ throw new NoSqlStoreException("failed to read field " + name, e);
+ }
+ }
+
+ @Override
+ public List<StateReader> readCollection(final String id) {
+ final JSONArray array = instance.optJSONArray(id);
+ final List<StateReader> readers = new ArrayList<StateReader>();
+ if (array != null) {
+ final int size = array.length();
+ for (int i = 0; i < size; i++) {
+ readers.add(new JsonStateReader(array.optJSONObject(i)));
+ }
+ }
+ return readers;
+ }
+}
http://git-wip-us.apache.org/repos/asf/isis/blob/0861ed93/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/JsonStateWriter.java
----------------------------------------------------------------------
diff --git a/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/JsonStateWriter.java b/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/JsonStateWriter.java
new file mode 100644
index 0000000..56a70d2
--- /dev/null
+++ b/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/JsonStateWriter.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.isis.runtimes.dflt.objectstores.nosql.db.file;
+
+import java.util.List;
+
+import org.apache.isis.core.metamodel.adapter.oid.OidMarshaller;
+import org.apache.isis.core.metamodel.adapter.oid.TypedOid;
+import org.apache.isis.runtimes.dflt.objectstores.nosql.NoSqlStoreException;
+import org.apache.isis.runtimes.dflt.objectstores.nosql.db.StateWriter;
+import org.apache.isis.runtimes.dflt.runtime.system.context.IsisContext;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import com.google.common.collect.Lists;
+
+public class JsonStateWriter implements StateWriter {
+
+ private final JSONObject dbObject = new JSONObject();
+
+ private TypedOid oid;
+ private String currentVersion;
+ private String newVersion;
+
+
+ @Override
+ public StateWriter addAggregate(final String id) {
+ final JsonStateWriter jsonStateWriter = new JsonStateWriter();
+ try {
+ dbObject.put(id, jsonStateWriter.dbObject);
+ } catch (final JSONException e) {
+ throw new NoSqlStoreException(e);
+ }
+ return jsonStateWriter;
+ }
+
+ @Override
+ public void writeOid(final TypedOid typedOid) {
+ this.oid = typedOid;
+ writeField(PropertyNames.OID, typedOid.enString(getOidMarshaller()));
+ }
+
+ @Override
+ public void writeEncryptionType(final String type) {
+ writeField(PropertyNames.ENCRYPT, type);
+ }
+
+ @Override
+ public void writeVersion(final String currentVersion, final String newVersion) {
+ this.currentVersion = currentVersion;
+ this.newVersion = newVersion;
+ writeField(PropertyNames.VERSION, newVersion);
+ }
+
+ @Override
+ public void writeTime(final String time) {
+ writeField(PropertyNames.TIME, time);
+ }
+
+ @Override
+ public void writeUser(final String user) {
+ writeField(PropertyNames.USER, user);
+ }
+
+ @Override
+ public void writeField(final String id, final String data) {
+ try {
+ dbObject.put(id, data == null ? JSONObject.NULL : data);
+ } catch (final JSONException e) {
+ throw new NoSqlStoreException(e);
+ }
+ }
+
+ @Override
+ public void writeField(final String id, final long l) {
+ try {
+ dbObject.put(id, Long.toString(l));
+ } catch (final JSONException e) {
+ throw new NoSqlStoreException(e);
+ }
+ }
+
+ public String getRequest() {
+ return oid.enString(getOidMarshaller()) + " " + currentVersion + " " + newVersion;
+ }
+
+ public String getData() {
+ try {
+ return dbObject.toString(4);
+ } catch (final JSONException e) {
+ throw new NoSqlStoreException(e);
+ }
+ }
+
+ @Override
+ public StateWriter createElementWriter() {
+ return new JsonStateWriter();
+ }
+
+ @Override
+ public void writeCollection(final String id, final List<StateWriter> elements) {
+ final List<JSONObject> collection = Lists.newArrayList();
+ for (final StateWriter writer : elements) {
+ collection.add(((JsonStateWriter) writer).dbObject);
+ }
+ try {
+ dbObject.put(id, collection);
+ } catch (final JSONException e) {
+ throw new NoSqlStoreException(e);
+ }
+ }
+
+
+ ///////////////////////////////////////////////////////
+ // dependencies (from context)
+ ///////////////////////////////////////////////////////
+
+ protected OidMarshaller getOidMarshaller() {
+ return IsisContext.getOidMarshaller();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/isis/blob/0861ed93/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/PropertyNames.java
----------------------------------------------------------------------
diff --git a/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/PropertyNames.java b/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/PropertyNames.java
new file mode 100644
index 0000000..6cf57e3
--- /dev/null
+++ b/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/PropertyNames.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.isis.runtimes.dflt.objectstores.nosql.db.file;
+
+interface PropertyNames {
+ static final String ENCRYPT = "_encrypt";
+
+// static final String TYPE = "_type";
+// static final String ID = "_id";
+
+ static final String OID = "_oid";
+ static final String VERSION = "_version";
+ static final String TIME = "_time";
+ static final String USER = "_user";
+}
+
http://git-wip-us.apache.org/repos/asf/isis/blob/0861ed93/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/RemotingException.java
----------------------------------------------------------------------
diff --git a/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/RemotingException.java b/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/RemotingException.java
new file mode 100644
index 0000000..c0b6a60
--- /dev/null
+++ b/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/RemotingException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.isis.runtimes.dflt.objectstores.nosql.db.file;
+
+import org.apache.isis.core.commons.exceptions.IsisException;
+
+public class RemotingException extends IsisException {
+
+ private static final long serialVersionUID = 1L;
+
+ public RemotingException(final String message) {
+ super(message);
+ }
+
+ public RemotingException(final Exception e) {
+ super(e);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/isis/blob/0861ed93/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/server/DataFileReader.java
----------------------------------------------------------------------
diff --git a/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/server/DataFileReader.java b/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/server/DataFileReader.java
new file mode 100644
index 0000000..0e6774d
--- /dev/null
+++ b/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/server/DataFileReader.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.isis.runtimes.dflt.objectstores.nosql.db.file.server;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import org.apache.log4j.Logger;
+
+public class DataFileReader {
+ private static final Logger LOG = Logger.getLogger(DataFileReader.class);
+
+ private final BufferedReader reader;
+ private final String id;
+ private final String version;
+
+ /**
+ * Opens the file for the specified id. The top line contains: type id
+ * version newline The remainder contains the data.
+ */
+ public DataFileReader(final String type, final String id) throws IOException {
+ final File file = Util.dataFile(type, id);
+ reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), "utf-8"));
+ final String line = reader.readLine();
+ if (line == null || line.length() == 0) {
+ throw new FileServerException("No data in file: " + file.getAbsolutePath());
+ }
+ final String[] split = line.split(" ");
+ this.id = split[1];
+ if (!id.equals(id)) {
+ throw new FileServerException("Id in file (" + this.id + ") not the same as the file name: " + file.getAbsolutePath());
+ }
+ version = split[2];
+ }
+
+ public void close() {
+ if (reader != null) {
+ try {
+ reader.close();
+ } catch (final IOException e) {
+ LOG.error("Failed to close reader " + reader, e);
+ }
+ }
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ public String getData() {
+ try {
+ final StringBuffer buffer = new StringBuffer();
+ String line;
+ while ((line = reader.readLine()) != null) {
+ buffer.append(line);
+ buffer.append("\n");
+ }
+ return buffer.toString();
+ } catch (final IOException e) {
+ throw new FileServerException("Failed to read data for " + id, e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/isis/blob/0861ed93/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/server/DataFileWriter.java
----------------------------------------------------------------------
diff --git a/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/server/DataFileWriter.java b/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/server/DataFileWriter.java
new file mode 100644
index 0000000..cf57046
--- /dev/null
+++ b/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/server/DataFileWriter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.isis.runtimes.dflt.objectstores.nosql.db.file.server;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+public class DataFileWriter {
+
+ // private static final Logger LOG = Logger.getLogger(DataWriter.class);
+
+ private final List<FileContent> files;
+
+ public DataFileWriter(final List<FileContent> files) {
+ this.files = files;
+ }
+
+ public void writeData() throws IOException {
+ for (final FileContent content : files) {
+ if (Util.isDelete(content.command)) {
+ final File f = Util.dataFile(content.type, content.id);
+ f.delete();
+ } else {
+ writeFile(content);
+ }
+ }
+ }
+
+ // TODO to be consistent use PrintWriter
+ private void writeFile(final FileContent content) throws IOException {
+ FileOutputStream output = null;
+ final File file = Util.dataFile(content.type, content.id);
+ if (!file.getParentFile().exists()) {
+ file.getParentFile().mkdir();
+ }
+ try {
+ output = new FileOutputStream(file);
+ content.write(output);
+ } finally {
+ Util.closeSafely(output);
+ }
+ }
+
+ public void close() {
+ // TODO Auto-generated method stub
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/isis/blob/0861ed93/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/server/FileContent.java
----------------------------------------------------------------------
diff --git a/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/server/FileContent.java b/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/server/FileContent.java
new file mode 100644
index 0000000..3f4b077
--- /dev/null
+++ b/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/server/FileContent.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.isis.runtimes.dflt.objectstores.nosql.db.file.server;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+class FileContent {
+
+ private static final String ENCODING = "utf-8";
+
+ final char command;
+ final String id;
+ final String currentVersion;
+ final String newVersion;
+ final String data;
+ final String type;
+
+ public FileContent(final char command, final String id, final String currentVersion, final String newVersion, final String type, final String buf) {
+ this.command = command;
+ this.id = id;
+ this.currentVersion = currentVersion;
+ this.newVersion = newVersion;
+ this.type = type;
+ this.data = buf;
+ }
+
+ public void write(final OutputStream output) throws IOException {
+ output.write(type.getBytes(ENCODING));
+ output.write(' ');
+ output.write(id.getBytes(ENCODING));
+ output.write(' ');
+ output.write(newVersion.getBytes(ENCODING));
+ output.write('\n');
+ output.write(data.getBytes(ENCODING));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/isis/blob/0861ed93/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/server/FileServer.java
----------------------------------------------------------------------
diff --git a/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/server/FileServer.java b/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/server/FileServer.java
new file mode 100644
index 0000000..9f7edf6
--- /dev/null
+++ b/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/server/FileServer.java
@@ -0,0 +1,686 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.isis.runtimes.dflt.objectstores.nosql.db.file.server;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.LineNumberReader;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.net.ConnectException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
+
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.configuration.CompositeConfiguration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.configuration.SystemConfiguration;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PropertyConfigurator;
+
+import org.apache.isis.core.commons.lang.CastUtils;
+import org.apache.isis.runtimes.dflt.objectstores.nosql.NoSqlStoreException;
+
+public class FileServer {
+
+ private static final Logger LOG = Logger.getLogger(FileServer.class);
+ private static final String DEFAULT_HOST = "localhost";
+ private static final int DEFAULT_SERVICE_PORT = 9100;
+ private static final int DEFAULT_CONTROL_PORT = 9101;
+ private static final int DEFAULT_SYNC_PORT = 9102;
+ private static final int BACKLOG = 0;
+ private static final int INIT = 1;
+ private static final int RECOVERY_LOG = 2;
+
+ public static void main(final String[] args) throws IOException, ParseException {
+
+ final Options options = new Options();
+ options.addOption("h", "help", false, "Show this help");
+ options.addOption("m", "mode", true, "mode: normal | secondary | recovery | archive");
+
+ final CommandLineParser parser = new BasicParser();
+ final CommandLine cmd = parser.parse(options, args);
+
+ if (cmd.hasOption('h')) {
+ printHelp(options);
+ return;
+ }
+
+ final String mode = cmd.getOptionValue("m");
+
+ final List<String> argList = CastUtils.cast(cmd.getArgList());
+ if ("recovery".equals(mode)) {
+ final FileServer fileServer = new FileServer();
+ fileServer.startRecovery(argList);
+ } else if ("archive".equals(mode)) {
+ final FileServer fileServer = new FileServer();
+ fileServer.startArchive(argList);
+ } else if ("secondary".equals(mode)) {
+ final FileServer fileServer = new FileServer();
+ fileServer.startSecondary();
+ } else if (mode == null || "normal".equals(mode)) {
+ final FileServer fileServer = new FileServer();
+ fileServer.startNormal();
+ } else {
+ printHelp(options);
+ }
+ }
+
+ private static void printHelp(final Options options) {
+ final HelpFormatter help = new HelpFormatter();
+ help.printHelp("FileSever [OPTIONS] [FIRST RECOVERY FILES] [LAST RECOVERY FILES]", options);
+ }
+
+ private FileServerProcessor server;
+ private CompositeConfiguration config;
+
+ private boolean awaitConnections = true;
+ private boolean isQuiescent = false;
+ private long requests;
+
+ public FileServer() {
+ PropertyConfigurator.configure("config/logging.properties");
+
+ try {
+ config = new CompositeConfiguration();
+ config.addConfiguration(new SystemConfiguration());
+ config.addConfiguration(new PropertiesConfiguration("config/server.properties"));
+
+ final String data = config.getString("fileserver.data");
+ final String services = config.getString("fileserver.services");
+ final String logs = config.getString("fileserver.logs");
+ final String archive = config.getString("fileserver.archive");
+
+ Util.setDirectory(data, services, logs, archive);
+ server = new FileServerProcessor();
+ } catch (final ConfigurationException e) {
+ LOG.error("configuration failure", e);
+ System.out.println(e.getMessage());
+ System.exit(0);
+ }
+ }
+
+ private void startNormal() {
+ new Thread("control") {
+ @Override
+ public void run() {
+ startControl();
+ };
+ }.start();
+ new Thread("service") {
+ @Override
+ public void run() {
+ startService();
+ };
+ }.start();
+ new Thread("log-rolling") {
+ @Override
+ public void run() {
+ startLogRolling();
+ }
+ }.start();
+ if (config.getBoolean("fileserver.sync", false)) {
+ new Thread("sync") {
+ @Override
+ public void run() {
+ startSyncing();
+ };
+ }.start();
+ } else {
+ LOG.info("not syncing to secondary server");
+ }
+
+ }
+
+ private void startService() {
+ final String serviceHost = config.getString("fileserver.host", DEFAULT_HOST);
+ final int servicePort = config.getInt("fileserver.port", DEFAULT_SERVICE_PORT);
+ final int connectionTimeout = config.getInt("fileserver.connection.timeout", 5000);
+ final int readTimeout = config.getInt("fileserver.read.timeout", 5000);
+
+ ServerSocket socket = null;
+ try {
+ LOG.debug("setting up service socket on " + serviceHost + ":" + servicePort);
+ final InetAddress address = InetAddress.getByName(serviceHost);
+ socket = new ServerSocket(servicePort, BACKLOG, address);
+ socket.setSoTimeout(connectionTimeout);
+ LOG.info("file service listenting on " + socket.getInetAddress().getHostAddress() + " port " + socket.getLocalPort());
+ LOG.debug("file service listenting on " + socket);
+ final LogRange logFileRange = Util.logFileRange();
+ if (!logFileRange.noLogFile()) {
+ final long lastRecoveryFile = logFileRange.getLast();
+ final File file = Util.logFile(lastRecoveryFile);
+ LOG.info("replaying last recovery file: " + file.getAbsolutePath());
+ recover(file);
+ }
+ server.startup();
+ } catch (final UnknownHostException e) {
+ LOG.error("Unknown host " + serviceHost, e);
+ System.exit(0);
+ } catch (final IOException e) {
+ LOG.error("start failure - networking not set up for " + serviceHost, e);
+ System.exit(0);
+ } catch (final RuntimeException e) {
+ LOG.error("start failure", e);
+ System.exit(0);
+ }
+ do {
+ try {
+ while (isQuiescent) {
+ try {
+ Thread.sleep(300);
+ } catch (final InterruptedException ignore) {
+ }
+ }
+ final Socket connection = socket.accept();
+ LOG.debug("connection from " + connection);
+ connection.setSoTimeout(readTimeout);
+ serviceConnection(connection, readTimeout);
+ } catch (final SocketTimeoutException expected) {
+ } catch (final IOException e) {
+ LOG.error("networking problem", e);
+ }
+ } while (awaitConnections);
+ }
+
+ private void serviceConnection(final Socket connection, final int readTimeout) {
+ try {
+ final InputStream input = connection.getInputStream();
+ final OutputStream output = connection.getOutputStream();
+ final ServerConnection pipe = new ServerConnection(input, output);
+ requests++;
+ server.process(pipe);
+ pipe.logComplete();
+ } catch (final NoSqlStoreException e) {
+ if (e.getCause() instanceof SocketTimeoutException) {
+ LOG.error("read timed out after " + (readTimeout / 1000.0) + " seconds", e);
+ } else {
+ LOG.error("file server failure", e);
+ }
+ } catch (final IOException e) {
+ LOG.error("networking failure", e);
+ } catch (final RuntimeException e) {
+ LOG.error("request failure", e);
+ } finally {
+ try {
+ connection.close();
+ } catch (final IOException e) {
+ LOG.warn("failure to close connection", e);
+ }
+ }
+ }
+
+ private void startSyncing() {
+ final String syncHost = config.getString("fileserver.sync-host", DEFAULT_HOST);
+ final int syncPort = config.getInt("fileserver.sync-port", DEFAULT_SYNC_PORT);
+ final int connectionTimeout = config.getInt("fileserver.connection.timeout", 5000);
+
+ LOG.info("preparing to sync to secondary server on " + syncHost + " port " + syncPort);
+
+ final InetAddress address;
+ try {
+ address = InetAddress.getByName(syncHost);
+ } catch (final UnknownHostException e) {
+ LOG.error("Unknown host " + syncHost, e);
+ System.exit(0);
+ return;
+ }
+
+ while (awaitConnections) {
+ Socket socket = null;
+ try {
+ socket = new Socket(address, syncPort);
+ LOG.info("sync connected to " + socket.getInetAddress().getHostAddress() + " port " + socket.getLocalPort());
+
+ final CRC32 crc32 = new CRC32();
+ final DataOutput output = new DataOutputStream(new CheckedOutputStream(socket.getOutputStream(), crc32));
+ final DataInput input = new DataInputStream(socket.getInputStream());
+ output.writeByte(INIT);
+ long logId = input.readLong();
+ do {
+ final long nextLogId = logId + 1;
+ final File file = Util.logFile(nextLogId);
+ if (file.exists() && server.getLogger().isWritten(nextLogId)) {
+ logId++;
+
+ output.writeByte(RECOVERY_LOG);
+ crc32.reset();
+ output.writeLong(logId);
+
+ LOG.info("sending recovery file: " + file.getName());
+ final BufferedInputStream fileInput = new BufferedInputStream(new FileInputStream(file));
+
+ final byte[] buffer = new byte[8092];
+ int read;
+ while ((read = fileInput.read(buffer)) > 0) {
+ output.writeInt(read);
+ output.write(buffer, 0, read);
+ }
+ output.writeInt(0);
+
+ output.writeLong(crc32.getValue());
+ }
+ try {
+ Thread.sleep(300);
+ } catch (final InterruptedException ignore) {
+ }
+
+ while (isQuiescent) {
+ try {
+ Thread.sleep(300);
+ } catch (final InterruptedException ignore) {
+ }
+ }
+ } while (awaitConnections);
+
+ } catch (final ConnectException e) {
+ LOG.warn("not yet connected to secondary server at " + syncHost + " port " + syncPort);
+ try {
+ Thread.sleep(connectionTimeout);
+ } catch (final InterruptedException ignore) {
+ }
+ } catch (final IOException e) {
+ LOG.error("start failure - networking not set up for " + syncHost, e);
+ try {
+ Thread.sleep(300);
+ } catch (final InterruptedException ignore) {
+ }
+ } catch (final RuntimeException e) {
+ LOG.error("start failure", e);
+ try {
+ Thread.sleep(300);
+ } catch (final InterruptedException ignore) {
+ }
+ }
+ }
+
+ }
+
+ private void startControl() {
+ final String controlHost = config.getString("fileserver.control-host", DEFAULT_HOST);
+ final int controlPort = config.getInt("fileserver.control-port", DEFAULT_CONTROL_PORT);
+ final int connectionTimeout = config.getInt("fileserver.connection.timeout", 5000);
+
+ ServerSocket socket = null;
+ try {
+ LOG.debug("setting up control socket on " + controlHost + ":" + controlPort);
+ final InetAddress address = InetAddress.getByName(controlHost);
+ socket = new ServerSocket(controlPort, 0, address);
+ socket.setSoTimeout(connectionTimeout);
+ LOG.info("file control listenting on " + socket.getInetAddress().getHostAddress() + " port " + socket.getLocalPort());
+ LOG.debug("file control listenting on " + socket);
+ } catch (final UnknownHostException e) {
+ LOG.error("Unknown host " + controlHost, e);
+ System.exit(0);
+ } catch (final IOException e) {
+ LOG.error("start failure - networking not set up for " + controlHost, e);
+ System.exit(0);
+ } catch (final RuntimeException e) {
+ LOG.error("start failure", e);
+ System.exit(0);
+ }
+ do {
+ try {
+ final Socket connection = socket.accept();
+ LOG.info("control connection from " + connection);
+ controlConnection(connection);
+ } catch (final SocketTimeoutException expected) {
+ } catch (final IOException e) {
+ LOG.error("networking problem", e);
+ }
+ } while (awaitConnections);
+ }
+
+ private void controlConnection(final Socket connection) {
+ try {
+ final InputStream input = connection.getInputStream();
+ final BufferedReader reader = new BufferedReader(new InputStreamReader(input));
+ final OutputStream output = connection.getOutputStream();
+ final PrintWriter print = new PrintWriter(output);
+ print.print("> ");
+ print.flush();
+ String line;
+ while ((line = reader.readLine()) != null) {
+ if ("shutdown".equals(line)) {
+ awaitConnections = false;
+ print.println("Server shutdown initiated...");
+ print.flush();
+ server.shutdown();
+ break;
+ } else if ("quiesce".equals(line)) {
+ isQuiescent = true;
+ final String message = "Placing server in a quiescent state";
+ LOG.info(message);
+ print.println(message);
+ print.print("> ");
+ print.flush();
+ } else if ("resume".equals(line)) {
+ if (isQuiescent) {
+ isQuiescent = false;
+ final String message = "Resuming from a quiescent state";
+ LOG.info(message);
+ print.println(message);
+ } else {
+ print.println("Can't resume as not currently in a quiescent state");
+ }
+ print.print("> ");
+ print.flush();
+ } else if ("quit".equals(line)) {
+ print.println("Bye");
+ print.flush();
+ break;
+ } else if ("status".equals(line)) {
+ print.println("requests: " + requests);
+ print.println("quiescent: " + isQuiescent);
+ print.print("> ");
+ print.flush();
+ } else {
+ print.println("Unknown command, valid commands are: quit, quiesce, status, resume, shutdown");
+ print.print("> ");
+ print.flush();
+ }
+ }
+ } catch (final IOException e) {
+ LOG.error("networking failure", e);
+ } catch (final RuntimeException e) {
+ LOG.error("request failure", e);
+ } finally {
+ try {
+ connection.close();
+ } catch (final IOException e) {
+ LOG.warn("failure to close connection", e);
+ }
+ }
+ }
+
+ private void startRecovery(final List<String> list) {
+ LOG.info("starting recovery");
+ final LogRange logFileRange = Util.logFileRange();
+ if (logFileRange.noLogFile()) {
+ System.err.println("No recovery files found");
+ System.exit(0);
+ }
+ final long lastId = logFileRange.getLast();
+ LOG.info("last log file is " + Util.logFile(lastId).getName());
+
+ long startId = lastId;
+ long endId = lastId;
+
+ final int size = list.size();
+ if (size > 0) {
+ startId = Long.valueOf(list.get(0));
+ if (size > 1) {
+ endId = Long.valueOf(list.get(1));
+ }
+ }
+ if (startId < logFileRange.getFirst() || startId > lastId || endId > lastId) {
+ System.err.println("File IDs invalid: they must be between " + logFileRange.getFirst() + " and " + lastId);
+ System.exit(0);
+ }
+ if (startId > endId) {
+ System.err.println("File IDs invalid: start must be before the end");
+ System.exit(0);
+ }
+
+ Util.ensureDirectoryExists();
+ for (long id = startId; id <= endId; id++) {
+ final File file = Util.logFile(id);
+ LOG.info("recovering data from " + file.getName());
+ recover(file);
+ }
+ LOG.info("recovery complete");
+ }
+
+ private void startArchive(final List<String> list) {
+ LOG.info("starting archiving");
+ final LogRange logFileRange = Util.logFileRange();
+ if (logFileRange.noLogFile()) {
+ System.err.println("No recovery files found");
+ System.exit(0);
+ }
+ final long lastId = logFileRange.getLast();
+ LOG.info("last log file is " + Util.logFile(lastId).getName());
+
+ long endId = lastId - 1;
+
+ final int size = list.size();
+ if (size > 0) {
+ endId = Long.valueOf((String) list.get(0));
+ }
+ if (endId >= lastId) {
+ System.err.println("File ID invalid: they must be less that " + lastId);
+ System.exit(0);
+ }
+ final long startId = logFileRange.getFirst();
+ for (long id = startId; id <= endId; id++) {
+ final File file = Util.logFile(id);
+ LOG.info("moving " + file.getName());
+ final File destination = Util.archiveLogFile(id);
+ file.renameTo(destination);
+ }
+ LOG.info("archive complete");
+
+ }
+
+ private void startSecondary() {
+ final String serviceHost = config.getString("fileserver.sync-host", DEFAULT_HOST);
+ final int servicePort = config.getInt("fileserver.sync-port", DEFAULT_SYNC_PORT);
+
+ Util.ensureDirectoryExists();
+ ServerSocket socket = null;
+ try {
+ LOG.debug("setting up syncing socket on " + serviceHost + ":" + servicePort);
+ final InetAddress address = InetAddress.getByName(serviceHost);
+ socket = new ServerSocket(servicePort, 0, address);
+ LOG.info("listenting on " + socket.getInetAddress().getHostAddress() + " port " + socket.getLocalPort());
+ LOG.debug("listenting on " + socket);
+ do {
+ syncConnection(socket.accept(), 0);
+ } while (awaitConnections);
+ } catch (final UnknownHostException e) {
+ LOG.error("Unknown host " + serviceHost, e);
+ System.exit(0);
+ } catch (final IOException e) {
+ LOG.error("start failure - networking not set up for " + serviceHost, e);
+ System.exit(0);
+ } catch (final RuntimeException e) {
+ LOG.error("start failure", e);
+ System.exit(0);
+ }
+ }
+
+ private void syncConnection(final Socket connection, final int readTimeout) {
+ try {
+ final CRC32 crc32 = new CRC32();
+ final DataOutput output = new DataOutputStream(connection.getOutputStream());
+ final DataInput input = new DataInputStream(new CheckedInputStream(connection.getInputStream(), crc32));
+
+ if (input.readByte() != INIT) {
+ return;
+ }
+
+ final LogRange logFileRange = Util.logFileRange();
+ final long lastId = logFileRange.noLogFile() ? -1 : logFileRange.getLast();
+ output.writeLong(lastId);
+ do {
+ if (input.readByte() != RECOVERY_LOG) {
+ return;
+ }
+ crc32.reset();
+ final long logId = input.readLong();
+ final File file = Util.tmpLogFile(logId);
+ LOG.info("syncing recovery file: " + file.getName());
+ final BufferedOutputStream fileOutput = new BufferedOutputStream(new FileOutputStream(file));
+
+ final byte[] buffer = new byte[8092];
+ int length;
+ while ((length = input.readInt()) > 0) {
+ input.readFully(buffer, 0, length);
+ fileOutput.write(buffer, 0, length);
+ }
+ fileOutput.close();
+
+ final long calculatedChecksum = crc32.getValue();
+ final long sentChecksum = input.readLong();
+ if (calculatedChecksum != sentChecksum) {
+ throw new NoSqlStoreException("Checksum didn't match during download of " + file.getName());
+ }
+
+ recover(file);
+ final File renameTo = Util.logFile(logId);
+ file.renameTo(renameTo);
+ } while (true);
+ } catch (final NoSqlStoreException e) {
+ LOG.error("file server failure", e);
+ } catch (final IOException e) {
+ LOG.error("networking failure", e);
+ } catch (final RuntimeException e) {
+ LOG.error("request failure", e);
+ } finally {
+ try {
+ connection.close();
+ } catch (final IOException e) {
+ LOG.warn("failure to close connection", e);
+ }
+ }
+
+ // TODO restart
+ }
+
+ private void recover(final File file) {
+ LineNumberReader reader = null;
+ try {
+ reader = new LineNumberReader(new InputStreamReader(new FileInputStream(file), Util.ENCODING));
+
+ while (true) {
+ final String line = reader.readLine();
+ if (line == null) {
+ break;
+ }
+ if (!line.startsWith("#transaction started")) {
+ throw new NoSqlStoreException("No transaction start found: " + line + " (" + reader.getLineNumber() + ")");
+ }
+ readTransaction(reader);
+ }
+ } catch (final IOException e) {
+ throw new NoSqlStoreException(e);
+ } finally {
+ if (reader != null) {
+ try {
+ reader.close();
+ } catch (final IOException e) {
+ throw new NoSqlStoreException(e);
+ }
+ }
+ }
+ }
+
+ private void readTransaction(final LineNumberReader reader) throws IOException {
+ final ArrayList<FileContent> files = new ArrayList<FileContent>();
+ final DataFileWriter content = new DataFileWriter(files);
+ String header;
+ while ((header = reader.readLine()) != null) {
+ if (header.startsWith("#transaction ended")) {
+ LOG.debug("transaction read in (ending " + reader.getLineNumber() + ")");
+ content.writeData();
+ reader.readLine();
+ return;
+ }
+ if (header.startsWith("S")) {
+ final String[] split = header.substring(1).split(" ");
+ final String key = split[0];
+ final String name = split[1];
+ server.saveService(key, name);
+ reader.readLine();
+ } else if (header.startsWith("B")) {
+ final String[] split = header.substring(1).split(" ");
+ final String name = split[0];
+ final long nextBatch = Long.valueOf(split[1]);
+ server.saveNextBatch(name, nextBatch);
+ reader.readLine();
+ } else {
+ FileContent elementData;
+ elementData = readElementData(header, reader);
+ files.add(elementData);
+ }
+ }
+ LOG.warn("transaction has no ending marker so is incomplete and will not be restored (ending " + reader.getLineNumber() + ")");
+ }
+
+ private FileContent readElementData(final String header, final LineNumberReader reader) throws IOException {
+ final StringBuffer content = new StringBuffer();
+ String line;
+ while ((line = reader.readLine()) != null) {
+ if (line.length() == 0) {
+ break;
+ }
+ content.append(line);
+ content.append('\n');
+ }
+
+ final char command = header.charAt(0);
+ final String[] split = header.substring(1).split(" ");
+ final String type = split[0];
+ final String id = split[1];
+ final String version = split[2];
+ return new FileContent(command, id, null, version, type, content.toString());
+ }
+
+ private void startLogRolling() {
+ final int rollPeriod = config.getInt("fileserver.log-period", 5);
+ final long sleepTime = rollPeriod * 60 * 1000;
+
+ while (awaitConnections) {
+ final LogWriter logger = server.getLogger();
+ if (logger != null) {
+ logger.startNewFile();
+ }
+ try {
+ Thread.sleep(sleepTime);
+ } catch (final InterruptedException ignore) {
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/isis/blob/0861ed93/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/server/FileServerException.java
----------------------------------------------------------------------
diff --git a/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/server/FileServerException.java b/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/server/FileServerException.java
new file mode 100644
index 0000000..dc0a2fc
--- /dev/null
+++ b/framework/component/objectstore/nosql/src/main/java/org/apache/isis/runtimes/dflt/objectstores/nosql/db/file/server/FileServerException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.isis.runtimes.dflt.objectstores.nosql.db.file.server;
+
+import java.io.IOException;
+
+import org.apache.isis.core.commons.exceptions.IsisException;
+
+public class FileServerException extends IsisException {
+
+ private static final long serialVersionUID = 1L;
+
+ public FileServerException(final String message, final IOException e) {
+ super(message, e);
+ }
+
+ public FileServerException(final String message) {
+ super(message);
+ }
+
+}