You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/12/07 10:22:43 UTC
[1/2] git commit: [HELIX-13] Initial commit for near real time rsync
replicated file system
Updated Branches:
refs/heads/master 4b0688333 -> 342ea1c27
[HELIX-13] Initial commit for near real time rsync replicated file system
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/342ea1c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/342ea1c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/342ea1c2
Branch: refs/heads/master
Commit: 342ea1c277651a6e3cf85b10949c2953d93618dd
Parents: 4b06883
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Fri Dec 7 01:21:09 2012 -0800
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Fri Dec 7 01:22:25 2012 -0800
----------------------------------------------------------------------
recipes/rsync-replicated-file-system/README.md | 1 +
recipes/rsync-replicated-file-system/bin/log4j.xml | 19 +
.../bin/send-message.sh | 8 +
.../bin/setup-cluster.sh | 8 +
.../bin/start-cluster-manager.sh | 8 +
.../bin/start-consumer.sh | 8 +
.../data/localhost_12000/translog/log.0 | Bin 0 -> 126 bytes
.../checkpoint/lastprocessedchange.checkpoint | 1 +
.../checkpoint/lastprocessedchange.checkpoint.bak | 1 +
.../data/localhost_12001/translog/log.0 | Bin 0 -> 126 bytes
recipes/rsync-replicated-file-system/pom.xml | 109 ++++
.../java/org/apache/helix/filestore/ChangeLog.java | 13 +
.../apache/helix/filestore/ChangeLogGenerator.java | 130 +++++
.../apache/helix/filestore/ChangeLogProcessor.java | 93 ++++
.../apache/helix/filestore/ChangeLogReader.java | 150 ++++++
.../org/apache/helix/filestore/ChangeRecord.java | 70 +++
.../org/apache/helix/filestore/CheckpointFile.java | 76 +++
.../apache/helix/filestore/ExternalCommand.java | 401 +++++++++++++++
.../apache/helix/filestore/FileChangeWatcher.java | 11 +
.../java/org/apache/helix/filestore/FileStore.java | 124 +++++
.../helix/filestore/FileStoreStateModel.java | 221 ++++++++
.../filestore/FileStoreStateModelFactory.java | 21 +
.../helix/filestore/FileSystemWatchService.java | 171 ++++++
.../apache/helix/filestore/IntegrationTest.java | 201 ++++++++
.../org/apache/helix/filestore/Replicator.java | 166 ++++++
.../org/apache/helix/filestore/RsyncDaemon.java | 29 +
.../org/apache/helix/filestore/RsyncInvoker.java | 114 ++++
.../org/apache/helix/filestore/SetupCluster.java | 58 ++
.../helix/filestore/StartClusterManager.java | 42 ++
.../main/java/org/apache/helix/filestore/Test.java | 143 +++++
30 files changed, 2397 insertions(+), 0 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/README.md
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/README.md b/recipes/rsync-replicated-file-system/README.md
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/README.md
@@ -0,0 +1 @@
+
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/bin/log4j.xml
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/bin/log4j.xml b/recipes/rsync-replicated-file-system/bin/log4j.xml
new file mode 100644
index 0000000..09ed7cd
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/bin/log4j.xml
@@ -0,0 +1,19 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+
+ <appender name="ConsoleAppender" class="org.apache.log4j.ConsoleAppender">
+ <param name="Target" value="System.err" />
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d{yyyy/MM/dd HH:mm:ss.SSS} %p [%c{1}] [perf-test] %m%n" />
+ </layout>
+ </appender>
+
+ <root>
+ <priority value="debug"/>
+ <appender-ref ref="ConsoleAppender"/>
+ </root>
+
+</log4j:configuration>
+
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/bin/send-message.sh
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/bin/send-message.sh b/recipes/rsync-replicated-file-system/bin/send-message.sh
new file mode 100755
index 0000000..89ee65a
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/bin/send-message.sh
@@ -0,0 +1,8 @@
+#!/bin/bash
+
+script_dir=`dirname $0`
+LIB=$script_dir/../lib
+CLASSPATH=$script_dir/../target/classes:"$LIB"/helix-core-0.1-SNAPSHOT-incubating.jar:"$LIB"/rabbitmq-client.jar:"$LIB"/commons-cli-1.1.jar:"$LIB"/commons-io-1.2.jar:"$LIB"/commons-math-2.1.jar:"$LIB"/jackson-core-asl-1.8.5.jar:"$LIB"/jackson-mapper-asl-1.8.5.jar:"$LIB"/log4j-1.2.15.jar:"$LIB"/org.restlet-1.1.10.jar:"$LIB"/zkclient-0.1.jar:"$LIB"/zookeeper-3.3.4.jar
+# echo $CLASSPATH
+
+java -cp "$CLASSPATH" org.apache.helix.filestore.Emitter $@
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/bin/setup-cluster.sh
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/bin/setup-cluster.sh b/recipes/rsync-replicated-file-system/bin/setup-cluster.sh
new file mode 100755
index 0000000..b924182
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/bin/setup-cluster.sh
@@ -0,0 +1,8 @@
+#!/bin/bash
+
+script_dir=`dirname $0`
+LIB=$script_dir/../lib
+CLASSPATH=$script_dir/../target/classes:"$LIB"/helix-core-0.1-SNAPSHOT-incubating.jar:"$LIB"/rabbitmq-client.jar:"$LIB"/commons-cli-1.1.jar:"$LIB"/commons-io-1.2.jar:"$LIB"/commons-math-2.1.jar:"$LIB"/jackson-core-asl-1.8.5.jar:"$LIB"/jackson-mapper-asl-1.8.5.jar:"$LIB"/log4j-1.2.15.jar:"$LIB"/org.restlet-1.1.10.jar:"$LIB"/zkclient-0.1.jar:"$LIB"/zookeeper-3.3.4.jar
+# echo $CLASSPATH
+
+java -cp "$CLASSPATH" org.apache.helix.filestore.SetupConsumerCluster $@
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/bin/start-cluster-manager.sh
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/bin/start-cluster-manager.sh b/recipes/rsync-replicated-file-system/bin/start-cluster-manager.sh
new file mode 100755
index 0000000..e8e0698
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/bin/start-cluster-manager.sh
@@ -0,0 +1,8 @@
+#!/bin/bash
+
+script_dir=`dirname $0`
+LIB=$script_dir/../lib
+CLASSPATH=$script_dir/../target/classes:"$LIB"/helix-core-0.1-SNAPSHOT-incubating.jar:"$LIB"/rabbitmq-client.jar:"$LIB"/commons-cli-1.1.jar:"$LIB"/commons-io-1.2.jar:"$LIB"/commons-math-2.1.jar:"$LIB"/jackson-core-asl-1.8.5.jar:"$LIB"/jackson-mapper-asl-1.8.5.jar:"$LIB"/log4j-1.2.15.jar:"$LIB"/org.restlet-1.1.10.jar:"$LIB"/zkclient-0.1.jar:"$LIB"/zookeeper-3.3.4.jar
+# echo $CLASSPATH
+
+java -cp "$CLASSPATH" org.apache.helix.filestore.StartClusterManager $@
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/bin/start-consumer.sh
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/bin/start-consumer.sh b/recipes/rsync-replicated-file-system/bin/start-consumer.sh
new file mode 100755
index 0000000..01b2ac6
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/bin/start-consumer.sh
@@ -0,0 +1,8 @@
+#!/bin/bash
+
+script_dir=`dirname $0`
+LIB=$script_dir/../lib
+CLASSPATH=$script_dir/../target/classes:"$LIB"/helix-core-0.1-SNAPSHOT-incubating.jar:"$LIB"/rabbitmq-client.jar:"$LIB"/commons-cli-1.1.jar:"$LIB"/commons-io-1.2.jar:"$LIB"/commons-math-2.1.jar:"$LIB"/jackson-core-asl-1.8.5.jar:"$LIB"/jackson-mapper-asl-1.8.5.jar:"$LIB"/log4j-1.2.15.jar:"$LIB"/org.restlet-1.1.10.jar:"$LIB"/zkclient-0.1.jar:"$LIB"/zookeeper-3.3.4.jar
+# echo $CLASSPATH
+
+java -cp "$CLASSPATH" org.apache.helix.filestore.FileStore $@
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/a
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/a b/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/a
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/b
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/b b/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/b
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/c
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/c b/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/c
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/d
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/d b/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/d
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/e
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/e b/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/e
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/f
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/f b/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/f
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/data/localhost_12000/translog/log.0
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/data/localhost_12000/translog/log.0 b/recipes/rsync-replicated-file-system/data/localhost_12000/translog/log.0
new file mode 100644
index 0000000..d73406d
Binary files /dev/null and b/recipes/rsync-replicated-file-system/data/localhost_12000/translog/log.0 differ
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/data/localhost_12000/translog/log.1
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/data/localhost_12000/translog/log.1 b/recipes/rsync-replicated-file-system/data/localhost_12000/translog/log.1
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/data/localhost_12001/checkpoint/lastprocessedchange.checkpoint
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/data/localhost_12001/checkpoint/lastprocessedchange.checkpoint b/recipes/rsync-replicated-file-system/data/localhost_12001/checkpoint/lastprocessedchange.checkpoint
new file mode 100644
index 0000000..77320cb
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/data/localhost_12001/checkpoint/lastprocessedchange.checkpoint
@@ -0,0 +1 @@
+2147483655|1354784309854|f|log.0|105|126
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/data/localhost_12001/checkpoint/lastprocessedchange.checkpoint.bak
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/data/localhost_12001/checkpoint/lastprocessedchange.checkpoint.bak b/recipes/rsync-replicated-file-system/data/localhost_12001/checkpoint/lastprocessedchange.checkpoint.bak
new file mode 100644
index 0000000..9f9165f
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/data/localhost_12001/checkpoint/lastprocessedchange.checkpoint.bak
@@ -0,0 +1 @@
+2147483654|1354784147820|e|log.0|84|105
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/a
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/a b/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/a
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/b
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/b b/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/b
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/c
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/c b/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/c
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/d
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/d b/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/d
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/e
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/e b/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/e
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/f
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/f b/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/f
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/data/localhost_12001/translog/log.0
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/data/localhost_12001/translog/log.0 b/recipes/rsync-replicated-file-system/data/localhost_12001/translog/log.0
new file mode 100644
index 0000000..826dd1c
Binary files /dev/null and b/recipes/rsync-replicated-file-system/data/localhost_12001/translog/log.0 differ
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/pom.xml
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/pom.xml b/recipes/rsync-replicated-file-system/pom.xml
new file mode 100644
index 0000000..26b52f9
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/pom.xml
@@ -0,0 +1,109 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>com.linkedin.helix</groupId>
+ <artifactId>rsync-replicated-file-system</artifactId>
+ <packaging>jar</packaging>
+ <version>1.0-SNAPSHOT</version>
+ <name>rsync-replicated-file-system</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>3.8.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.helix</groupId>
+ <artifactId>helix-core</artifactId>
+ <version>0.6-incubating-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.15</version>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.mail</groupId>
+ <artifactId>mail</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.jms</groupId>
+ <artifactId>jms</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-jci-fam</artifactId>
+ <version>1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.github.sgroschupf</groupId>
+ <artifactId>zkclient</artifactId>
+ <version>0.1</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>appassembler-maven-plugin</artifactId>
+ <version>1.1.1</version>
+ <configuration>
+ <!-- Set the target configuration directory to be used in the bin scripts -->
+ <!-- <configurationDirectory>conf</configurationDirectory> -->
+ <!-- Copy the contents from "/src/main/config" to the target configuration
+ directory in the assembled application -->
+ <!-- <copyConfigurationDirectory>true</copyConfigurationDirectory> -->
+ <!-- Include the target configuration directory in the beginning of
+ the classpath declaration in the bin scripts -->
+ <includeConfigurationDirectoryInClasspath>true</includeConfigurationDirectoryInClasspath>
+ <assembleDirectory>${project.build.directory}/${project.artifactId}-pkg</assembleDirectory>
+ <!-- Extra JVM arguments that will be included in the bin scripts -->
+ <extraJvmArguments>-Xms512m -Xmx512m</extraJvmArguments>
+ <!-- Generate bin scripts for windows and unix pr default -->
+ <platforms>
+ <platform>windows</platform>
+ <platform>unix</platform>
+ </platforms>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>assemble</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ <plugins>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>appassembler-maven-plugin</artifactId>
+ <configuration>
+ <programs>
+ <program>
+ <mainClass>org.apache.helix.filestore.IntegrationTest</mainClass>
+ <name>quickdemo</name>
+ </program>
+
+ </programs>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeLog.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeLog.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeLog.java
new file mode 100644
index 0000000..1f41a66
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeLog.java
@@ -0,0 +1,13 @@
+package org.apache.helix.filestore;
+
+public class ChangeLog
+{
+ public ChangeLog(String logDir){
+
+ }
+
+ public void logChange(String file)
+ {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeLogGenerator.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeLogGenerator.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeLogGenerator.java
new file mode 100644
index 0000000..28a7e97
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeLogGenerator.java
@@ -0,0 +1,130 @@
+package org.apache.helix.filestore;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.nio.charset.Charset;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.helix.filestore.FileSystemWatchService.ChangeType;
+
+public class ChangeLogGenerator implements FileChangeWatcher
+{
+ Lock lock;
+ private long currentSeq;
+ private long currentGen;
+ private int entriesLogged;
+ private DataOutputStream out;
+ private final String directory;
+
+ public ChangeLogGenerator(String directory, long startGen, long startSeq)
+ throws Exception
+ {
+ this.directory = directory;
+ lock = new ReentrantLock();
+ currentSeq = startSeq;
+ currentGen = startGen;
+ setLogFile();
+ }
+
+ private void setLogFile() throws Exception
+ {
+ File file = new File(directory);
+ String[] list = file.list();
+ if(list==null){
+ list = new String[]{};
+ }
+ int max = 1;
+ for (String name : list)
+ {
+ String[] split = name.split(".");
+ if (split.length == 2)
+ {
+ try
+ {
+ int index = Integer.parseInt(split[1]);
+ if (index > max)
+ {
+ max = index;
+ }
+ } catch (NumberFormatException e)
+ {
+ System.err.println("Invalid transaction log file found:" + name);
+ }
+ }
+ }
+
+ String transLogFile = directory+"/"+
+ "log." + (max);
+ System.out.println("Current file name:"+ transLogFile);
+ out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(transLogFile,true)));
+ }
+
+ @Override
+ public void onEntryAdded(String path)
+ {
+ appendChange(path, FileSystemWatchService.ChangeType.CREATE);
+
+ }
+
+ @Override
+ public void onEntryDeleted(String path)
+ {
+ appendChange(path, FileSystemWatchService.ChangeType.DELETE);
+
+ }
+
+ @Override
+ public void onEntryModified(String path)
+ {
+
+ appendChange(path, FileSystemWatchService.ChangeType.MODIFY);
+
+ }
+
+ public boolean appendChange(String path, ChangeType type)
+ {
+ lock.lock();
+ if(new File(path).isDirectory()){
+ return true;
+ }
+ try
+ {
+ ChangeRecord record = new ChangeRecord();
+ record.file = path;
+ record.timestamp = System.currentTimeMillis();
+ currentSeq++;
+ long txnId = (((long)currentGen) << 32) + ((long)currentSeq);
+ record.txid = txnId;
+ record.type = (short) type.ordinal();
+ write(record);
+ } catch (Exception e)
+ {
+ e.printStackTrace();
+ return false;
+ } finally
+ {
+ lock.unlock();
+ }
+ return true;
+ }
+
+ private void write(ChangeRecord record) throws Exception
+ {
+ out.writeLong(record.txid);
+ out.writeShort(record.type);
+ out.writeLong(record.timestamp);
+ out.writeUTF(record.file);
+ out.flush();
+ entriesLogged++;
+ if(entriesLogged==10000){
+ entriesLogged=0;
+ out.close();
+ setLogFile();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeLogProcessor.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeLogProcessor.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeLogProcessor.java
new file mode 100644
index 0000000..17ad0d5
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeLogProcessor.java
@@ -0,0 +1,93 @@
+package org.apache.helix.filestore;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.io.FileUtils;
+
+/**
+ * Processes the change log and invokes rsync for every change on the remote
+ * machine
+ *
+ * @author kgopalak
+ *
+ */
+public class ChangeLogProcessor implements Runnable
+{
+ private final ChangeLogReader reader;
+ RsyncInvoker rsyncInvoker;
+ private AtomicBoolean shutdownRequested;
+ private CheckpointFile checkpointFile;
+ private Thread thread;
+
+ public ChangeLogProcessor(ChangeLogReader reader, String remoteHost,
+ String remoteBaseDir, String localBaseDir, String checkpointDirPath)
+ throws Exception
+ {
+ this.reader = reader;
+ checkpointFile = new CheckpointFile(checkpointDirPath);
+
+ shutdownRequested = new AtomicBoolean(false);
+ rsyncInvoker = new RsyncInvoker(remoteHost, remoteBaseDir, localBaseDir);
+ }
+
+ public void start()
+ {
+ thread = new Thread(this);
+ thread.start();
+ }
+
+ public void run()
+ {
+ try
+ {
+ ChangeRecord lastRecordProcessed = checkpointFile.findLastRecordProcessed();
+ do
+ {
+ try
+ {
+ List<ChangeRecord> changes = reader
+ .getChangeSince(lastRecordProcessed);
+ Set<String> paths = getRemotePathsToSync(changes);
+ for (String path : paths)
+ {
+ rsyncInvoker.rsync(path);
+ }
+ lastRecordProcessed = changes.get(changes.size() - 1);
+ checkpointFile.checkpoint(lastRecordProcessed);
+ } catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ } while (!shutdownRequested.get());
+ } catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+
+
+ private Set<String> getRemotePathsToSync(List<ChangeRecord> changes)
+ {
+ Set<String> paths = new TreeSet<String>();
+ for (ChangeRecord change : changes)
+ {
+ paths.add(change.file);
+ }
+ return paths;
+ }
+
+ public void stop()
+ {
+ shutdownRequested.set(true);
+ thread.interrupt();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeLogReader.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeLogReader.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeLogReader.java
new file mode 100644
index 0000000..30198e8
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeLogReader.java
@@ -0,0 +1,150 @@
+package org.apache.helix.filestore;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class ChangeLogReader implements FileChangeWatcher
+{
+ int MAX_ENTRIES_TO_READ = 100;
+ private final String changeLogDir;
+ Lock lock;
+ private Condition condition;
+
+ public ChangeLogReader(String changeLogDir)
+ {
+ this.changeLogDir = changeLogDir;
+ lock = new ReentrantLock();
+ condition = lock.newCondition();
+
+ }
+
+ /**
+ * Blocking call
+ *
+ * @param record
+ * @return
+ */
+ public List<ChangeRecord> getChangeSince(ChangeRecord record)
+ {
+ List<ChangeRecord> changes = new ArrayList<ChangeRecord>();
+ String fileName;
+ long endOffset;
+ if (record == null)
+ {
+ fileName = "log.1";
+ endOffset = 0;
+ } else
+ {
+ fileName = record.changeLogFileName;
+ endOffset = record.endOffset;
+ }
+ try
+ {
+ lock.lock();
+
+ File file;
+ file = new File(changeLogDir + "/" + fileName);
+ while (!file.exists() || file.length() <= endOffset)
+ {
+ // wait
+ try
+ {
+ System.out.println("Waiting for new changes");
+ condition.await();
+ System.out.println("Detected changes");
+ } catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ RandomAccessFile raf = new RandomAccessFile(
+ changeLogDir + "/" + fileName, "r");
+ raf.seek(endOffset);
+ // out.writeLong(record.txid);
+ // out.writeShort(record.type);
+ // out.writeLong(record.timestamp);
+ // out.writeUTF(record.file);
+
+ int count = 0;
+ do {
+ ChangeRecord newRecord = new ChangeRecord();
+ newRecord.changeLogFileName = fileName;
+ newRecord.startOffset = raf.getFilePointer();
+ newRecord.txid = raf.readLong();
+ newRecord.type = raf.readShort();
+ newRecord.timestamp = raf.readLong();
+ newRecord.file = raf.readUTF();
+ newRecord.endOffset = raf.getFilePointer();
+ changes.add(newRecord);
+ count++;
+ }while (count < MAX_ENTRIES_TO_READ && raf.getFilePointer()< raf.length());
+ } catch (FileNotFoundException e)
+ {
+ e.printStackTrace();
+ } catch (IOException e)
+ {
+ e.printStackTrace();
+ } finally
+ {
+ lock.unlock();
+ }
+ return changes;
+ }
+
+ @Override
+ public void onEntryModified(String path)
+ {
+ try
+ {
+ lock.lock();
+ condition.signalAll();
+ } catch (Exception e)
+ {
+ // TODO: handle exception
+ } finally
+ {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void onEntryAdded(String path)
+ {
+ try
+ {
+ lock.lock();
+ condition.signalAll();
+ } catch (Exception e)
+ {
+ // TODO: handle exception
+ } finally
+ {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void onEntryDeleted(String path)
+ {
+ try
+ {
+ lock.lock();
+ condition.signalAll();
+ } catch (Exception e)
+ {
+ // TODO: handle exception
+ } finally
+ {
+ lock.unlock();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeRecord.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeRecord.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeRecord.java
new file mode 100644
index 0000000..567254b
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeRecord.java
@@ -0,0 +1,70 @@
+package org.apache.helix.filestore;
+
+public class ChangeRecord
+{
+ /**
+ * Transaction Id corresponding to the change that increases monotonically. 31
+ * LSB correspond to sequence number that increments every change. 31 MSB
+ * increments when the master changes
+ */
+ long txid;
+ /**
+ * File(s) that were changed
+ */
+ String file;
+
+ /**
+ * Timestamp
+ */
+ long timestamp;
+ /**
+ * Type of event like create, modified, deleted
+ */
+ short type;
+
+ transient String changeLogFileName;
+
+ transient long startOffset;
+
+ transient long endOffset;
+
+ public short fileFieldLength;
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(txid);
+ sb.append("|");
+ sb.append(timestamp);
+ sb.append("|");
+ sb.append(file);
+ sb.append("|");
+ sb.append(changeLogFileName);
+ sb.append("|");
+ sb.append(startOffset);
+ sb.append("|");
+ sb.append(endOffset);
+ return sb.toString();
+ }
+
+ public static ChangeRecord fromString(String line)
+ {
+ ChangeRecord record=null;
+ if (line != null)
+ {
+ String[] split = line.split("\\|");
+ if (split.length == 6)
+ {
+ record = new ChangeRecord();
+ record.txid = Long.parseLong(split[0]);
+ record.timestamp = Long.parseLong(split[1]);
+ record.file = split[2];
+ record.changeLogFileName = split[3];
+ record.startOffset = Long.parseLong(split[4]);
+ record.endOffset = Long.parseLong(split[5]);
+ }
+ }
+ return record;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/CheckpointFile.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/CheckpointFile.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/CheckpointFile.java
new file mode 100644
index 0000000..0d44e51
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/CheckpointFile.java
@@ -0,0 +1,76 @@
+package org.apache.helix.filestore;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+
+public class CheckpointFile
+{
+ private static final String TEMP = ".bak";
+ private static String CHECK_POINT_FILE = "lastprocessedchange.checkpoint";
+
+ private String checkpointDirPath;
+
+ public CheckpointFile(String checkpointDirPath) throws IOException
+ {
+ this.checkpointDirPath = checkpointDirPath;
+ File checkpointdir = new File(checkpointDirPath);
+ if (!checkpointdir.exists() && !checkpointdir.mkdirs())
+ {
+ throw new IOException("unable to create SCN file parent:"
+ + checkpointdir.getAbsolutePath());
+ }
+ }
+
+ public void checkpoint(ChangeRecord lastRecordProcessed) throws Exception
+ {
+
+ // delete the temp file if one exists
+ File tempCheckpointFile = new File(checkpointDirPath + "/" + CHECK_POINT_FILE
+ + TEMP);
+ if (tempCheckpointFile.exists() && !tempCheckpointFile.delete())
+ {
+ System.err.println("unable to erase temp SCN file: "
+ + tempCheckpointFile.getAbsolutePath());
+ }
+
+ String checkpointFileName = checkpointDirPath + "/" + CHECK_POINT_FILE;
+ File checkpointfile = new File(checkpointFileName);
+ if (checkpointfile.exists() && !checkpointfile.renameTo(tempCheckpointFile))
+ {
+ System.err.println("unable to backup scn file");
+ }
+ if (!checkpointfile.createNewFile())
+ {
+ System.err.println("unable to create new SCN file:"
+ + checkpointfile.getAbsolutePath());
+ }
+ FileWriter writer = new FileWriter(checkpointfile);
+ writer.write(lastRecordProcessed.toString());
+ writer.flush();
+ writer.close();
+ System.out.println("scn persisted: " + lastRecordProcessed.txid);
+
+ }
+
+ public ChangeRecord findLastRecordProcessed()
+ {
+ String checkpointFileName = checkpointDirPath + "/" + CHECK_POINT_FILE;
+ File file = new File(checkpointFileName);
+ ChangeRecord record = null;
+ if (file.exists())
+ {
+ try
+ {
+ String line = FileUtils.readFileToString(file);
+ record = ChangeRecord.fromString(line);
+ } catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ return record;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ExternalCommand.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ExternalCommand.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ExternalCommand.java
new file mode 100644
index 0000000..1ee822f
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ExternalCommand.java
@@ -0,0 +1,401 @@
+package org.apache.helix.filestore;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.log4j.Logger;
+
+public class ExternalCommand
+{
+ public static final String MODULE = ExternalCommand.class.getName();
+ public static final Logger LOG = Logger.getLogger(MODULE);
+
+ private final ProcessBuilder _processBuilder;
+
+ private Process _process;
+ private InputReader _out;
+ private InputReader _err;
+
+ private static class InputReader extends Thread
+ {
+ private static final int BUFFER_SIZE = 2048;
+
+ private final InputStream _in;
+ private final ByteArrayOutputStream _out;
+ private boolean _running = false;
+
+ InputReader(InputStream in)
+ {
+ _in = in;
+ _out = new ByteArrayOutputStream();
+ }
+
+ @Override
+ public void run()
+ {
+ _running = true;
+
+ byte[] buf = new byte[BUFFER_SIZE];
+ int n = 0;
+ try
+ {
+ while((n = _in.read(buf)) != -1)
+ _out.write(buf, 0, n);
+ }
+ catch(IOException e)
+ {
+ LOG.error("error while reading external command", e);
+ }
+
+ _running = false;
+ }
+
+ public byte[] getOutput()
+ {
+ if(_running)
+ throw new IllegalStateException("wait for process to be completed");
+
+ return _out.toByteArray();
+ }
+ }
+ /**
+* Constructor */
+ public ExternalCommand(ProcessBuilder processBuilder)
+ {
+ _processBuilder = processBuilder;
+ }
+
+ /**
+* After creating the command, you have to start it...
+*
+* @throws IOException
+*/
+ public void start() throws IOException
+ {
+ _process = _processBuilder.start();
+ //System.out.println(_processBuilder);
+ _out = new InputReader(new BufferedInputStream(_process.getInputStream()));
+ _err = new InputReader(new BufferedInputStream(_process.getErrorStream()));
+
+ _out.start();
+ _err.start();
+ }
+
+ /**
+* @see ProcessBuilder
+*/
+ public Map<String, String> getEnvironment()
+ {
+ return _processBuilder.environment();
+ }
+
+ /**
+* @see ProcessBuilder
+*/
+ public File getWorkingDirectory()
+ {
+ return _processBuilder.directory();
+ }
+
+ /**
+* @see ProcessBuilder
+*/
+ public void setWorkingDirectory(File directory)
+ {
+ _processBuilder.directory(directory);
+ }
+
+ /**
+* @see ProcessBuilder
+*/
+ public boolean getRedirectErrorStream()
+ {
+ return _processBuilder.redirectErrorStream();
+ }
+
+ /**
+* @see ProcessBuilder
+*/
+ public void setRedirectErrorStream(boolean redirectErrorStream)
+ {
+ _processBuilder.redirectErrorStream(redirectErrorStream);
+ }
+
+ public byte[] getOutput() throws InterruptedException
+ {
+ waitFor();
+ return _out.getOutput();
+ }
+
+ public byte[] getError() throws InterruptedException
+ {
+ waitFor();
+ return _err.getOutput();
+ }
+
+ /**
+* Returns the output as a string.
+*
+* @param encoding
+* @return encoded string
+* @throws InterruptedException
+* @throws UnsupportedEncodingException
+*/
+ public String getStringOutput(String encoding) throws InterruptedException,
+ UnsupportedEncodingException
+ {
+ return new String(getOutput(), encoding);
+ }
+
+ /**
+* Returns the output as a string. Uses encoding "UTF-8".
+*
+* @return utf8 encoded string
+* @throws InterruptedException
+*/
+ public String getStringOutput() throws InterruptedException
+ {
+ try
+ {
+ return getStringOutput("UTF-8");
+ }
+ catch(UnsupportedEncodingException e)
+ {
+ // should not happen
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+* Returns the error as a string.
+*
+* @param encoding
+* @return error as string
+* @throws InterruptedException
+* @throws UnsupportedEncodingException
+*/
+ public String getStringError(String encoding) throws InterruptedException,
+ UnsupportedEncodingException
+ {
+ return new String(getError(), encoding);
+ }
+
+ /**
+* Returns the error as a string. Uses encoding "UTF-8".
+*
+* @return error as string
+* @throws InterruptedException
+*/
+ public String getStringError() throws InterruptedException
+ {
+ try
+ {
+ return getStringError("UTF-8");
+ }
+ catch(UnsupportedEncodingException e)
+ {
+ // should not happen
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+* Properly waits until everything is complete: joins on the thread that
+* reads the output, joins on the thread that reads the error and finally
+* wait for the process to be finished.
+* @return the status code of the process.
+*
+* @throws InterruptedException
+*/
+ public int waitFor() throws InterruptedException
+ {
+ if(_process == null)
+ throw new IllegalStateException("you must call start first");
+
+ _out.join();
+ _err.join();
+ return _process.waitFor();
+ }
+
+ /**
+* Properly waits until everything is complete: joins on the thread that
+* reads the output, joins on the thread that reads the error and finally
+* wait for the process to be finished.
+* If the process has not completed before the timeout, throws a
+* {@link TimeoutException}
+* @return the status code of the process.
+*
+* @throws TimeoutException
+* @throws InterruptedException
+*/
+ public int waitFor(long timeout) throws InterruptedException, TimeoutException
+ {
+ if(_process == null)
+ throw new IllegalStateException("you must call start first");
+
+// Chronos c = new Chronos();
+ _out.join(timeout);
+// timeout -= c.tick();
+ if (timeout <= 0)
+ throw new TimeoutException("Wait timed out");
+ _err.join(timeout);
+// timeout -= c.tick();
+ if (timeout <= 0)
+ throw new TimeoutException("Wait timed out");
+
+ // there is no timeout in this API, not much we can do here
+ // waiting on the other two threads should give us some safety
+ return _process.waitFor();
+ }
+
+ public int exitValue()
+ {
+ if(_process == null)
+ throw new IllegalStateException("you must call start first");
+
+ return _process.exitValue();
+ }
+
+ public void destroy()
+ {
+ if(_process == null)
+ throw new IllegalStateException("you must call start first");
+
+ _process.destroy();
+ }
+
+ /**
+* Creates an external process from the command. It is not started and you have to call
+* start on it!
+*
+* @param commands the command to execute
+* @return the process */
+ public static ExternalCommand create(String... commands)
+ {
+ ExternalCommand ec = new ExternalCommand(new ProcessBuilder(commands));
+ return ec;
+ }
+
+ /**
+* Creates an external process from the command. It is not started and you have to call
+* start on it!
+*
+* @param commands the command to execute
+* @return the process */
+ public static ExternalCommand create(List<String> commands)
+ {
+ ExternalCommand ec = new ExternalCommand(new ProcessBuilder(commands));
+ return ec;
+ }
+
+ /**
+* Creates an external process from the command. The command is executed.
+*
+* @param commands the commands to execute
+* @return the process
+* @throws IOException if there is an error */
+ public static ExternalCommand start(String... commands) throws IOException
+ {
+ ExternalCommand ec = new ExternalCommand(new ProcessBuilder(commands));
+ ec.start();
+ return ec;
+ }
+
+ /**
+* Executes the external command in the given working directory and waits for it to be
+* finished.
+*
+* @param workingDirectory the root directory from where to run the command
+* @param command the command to execute (should be relative to the working directory
+* @param args the arguments to the command
+* @return the process */
+ public static ExternalCommand execute(File workingDirectory,
+ String command,
+ String... args)
+ throws IOException, InterruptedException
+ {
+ try
+ {
+ return executeWithTimeout(workingDirectory, command, 0, args);
+ }
+ catch (TimeoutException e)
+ {
+ // Can't happen!
+ throw new IllegalStateException(MODULE + ".execute: Unexpected timeout occurred!");
+ }
+ }
+
+/**
+* Executes the external command in the given working directory and waits (until timeout
+* is elapsed) for it to be finished.
+*
+* @param workingDirectory
+* the root directory from where to run the command
+* @param command
+* the command to execute (should be relative to the working directory
+* @param timeout
+* the maximum amount of time to wait for this external command (in ms). If
+* this value is less than or equal to 0, timeout is ignored
+* @param args
+* the arguments to the command
+* @return the process
+*/
+ public static ExternalCommand executeWithTimeout(File workingDirectory,
+ String command,
+ long timeout,
+ String... args)
+ throws IOException, InterruptedException, TimeoutException
+ {
+ List<String> arguments = new ArrayList<String>(args.length + 1);
+
+ arguments.add(new File(workingDirectory, command).getAbsolutePath());
+ arguments.addAll(Arrays.asList(args));
+
+ ExternalCommand cmd = ExternalCommand.create(arguments);
+
+ cmd.setWorkingDirectory(workingDirectory);
+
+ cmd.setRedirectErrorStream(true);
+
+ cmd.start();
+
+ /* Use timeout if it is a valid value! */
+ if (timeout <= 0)
+ cmd.waitFor();
+ else
+ cmd.waitFor(timeout);
+
+ if (LOG.isDebugEnabled())
+ LOG.debug(cmd.getStringOutput());
+
+ return cmd;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileChangeWatcher.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileChangeWatcher.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileChangeWatcher.java
new file mode 100644
index 0000000..549da36
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileChangeWatcher.java
@@ -0,0 +1,11 @@
+package org.apache.helix.filestore;
+
+public interface FileChangeWatcher
+{
+ void onEntryModified(String path);
+
+ void onEntryAdded(String path);
+
+ void onEntryDeleted(String path);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStore.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStore.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStore.java
new file mode 100644
index 0000000..be94148
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStore.java
@@ -0,0 +1,124 @@
+package org.apache.helix.filestore;
+
+import java.util.List;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.participant.StateMachineEngine;
+
+public class FileStore
+{
+ private final String _zkAddr;
+ private final String _clusterName;
+ private final String _serverId;
+ private HelixManager _manager = null;
+
+ public FileStore(String zkAddr, String clusterName, String serverId)
+ {
+ _zkAddr = zkAddr;
+ _clusterName = clusterName;
+ _serverId = serverId;
+ }
+
+ public void connect()
+ {
+ try
+ {
+ _manager =
+ HelixManagerFactory.getZKHelixManager(_clusterName,
+ _serverId,
+ InstanceType.PARTICIPANT,
+ _zkAddr);
+
+ StateMachineEngine stateMach = _manager.getStateMachineEngine();
+ FileStoreStateModelFactory modelFactory =
+ new FileStoreStateModelFactory(_manager);
+ stateMach.registerStateModelFactory(SetupCluster.DEFAULT_STATE_MODEL, modelFactory);
+ _manager.connect();
+// _manager.addExternalViewChangeListener(replicator);
+ Thread.currentThread().join();
+ }
+ catch (InterruptedException e)
+ {
+ System.err.println(" [-] " + _serverId + " is interrupted ...");
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ finally
+ {
+ disconnect();
+ }
+ }
+
+ public void disconnect()
+ {
+ if (_manager != null)
+ {
+ _manager.disconnect();
+ }
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ if (args.length < 3)
+ {
+ System.err.println("USAGE: java FileStore zookeeperAddress (e.g. localhost:2181) serverId , rabbitmqServer (e.g. localhost)");
+ System.exit(1);
+ }
+
+ final String zkAddr = args[0];
+ final String clusterName = SetupCluster.DEFAULT_CLUSTER_NAME;
+ final String consumerId = args[1];
+
+ ZkClient zkclient = null;
+ try
+ {
+ // add node to cluster if not already added
+ zkclient =
+ new ZkClient(zkAddr,
+ ZkClient.DEFAULT_SESSION_TIMEOUT,
+ ZkClient.DEFAULT_CONNECTION_TIMEOUT,
+ new ZNRecordSerializer());
+ ZKHelixAdmin admin = new ZKHelixAdmin(zkclient);
+
+ List<String> nodes = admin.getInstancesInCluster(clusterName);
+ if (!nodes.contains("consumer_" + consumerId))
+ {
+ InstanceConfig config = new InstanceConfig("consumer_" + consumerId);
+ config.setHostName("localhost");
+ config.setInstanceEnabled(true);
+ admin.addInstance(clusterName, config);
+ }
+
+ // start consumer
+ final FileStore consumer =
+ new FileStore(zkAddr, clusterName, "consumer_" + consumerId);
+
+ Runtime.getRuntime().addShutdownHook(new Thread()
+ {
+ @Override
+ public void run()
+ {
+ System.out.println("Shutting down consumer_" + consumerId);
+ consumer.disconnect();
+ }
+ });
+
+ consumer.connect();
+ }
+ finally
+ {
+ if (zkclient != null)
+ {
+ zkclient.close();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModel.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModel.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModel.java
new file mode 100644
index 0000000..760bcf1
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModel.java
@@ -0,0 +1,221 @@
+package org.apache.helix.filestore;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.data.Stat;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+
+@StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "MASTER",
+ "SLAVE" })
+public class FileStoreStateModel extends StateModel
+{
+ private final class HighWaterMarkUpdater implements DataUpdater<ZNRecord>
+ {
+ private final Message message;
+ private final ChangeRecord lastRecordProcessed;
+
+ private HighWaterMarkUpdater(Message message,
+ ChangeRecord lastRecordProcessed)
+ {
+ this.message = message;
+ this.lastRecordProcessed = lastRecordProcessed;
+ }
+
+ @Override
+ public ZNRecord update(ZNRecord currentData)
+ {
+ ZNRecord newRec = new ZNRecord(message.getResourceName());
+
+ if (currentData != null)
+ {
+ int currentGen = convertToInt(newRec.getSimpleField("currentGen"), 0);
+ int currentGenStartSeq = convertToInt(
+ newRec.getSimpleField("currentGenStartSeq"), 0);
+ int prevGen = convertToInt(newRec.getSimpleField("prevGen"), 0);
+ int prevGenEndSeq = convertToInt(
+ newRec.getSimpleField("prevGenEndSeq"), 0);
+ newRec.setSimpleField("currentGen", Integer.toString(currentGen + 1));
+ newRec.setSimpleField("currentGenStartSeq", Integer.toString(1));
+ if (currentGen > 0)
+ {
+ newRec.setSimpleField("prevGen", Integer.toString(currentGen));
+ int localEndSeq = 1;
+ if (lastRecordProcessed != null)
+ {
+ localEndSeq = (int) lastRecordProcessed.txid;
+ }
+ newRec.setSimpleField("prevGenEndSeq", "" + localEndSeq);
+ }
+ newRec.merge(currentData);
+ } else
+ {
+ newRec.setSimpleField("currentGen", Integer.toString(1));
+ newRec.setSimpleField("currentGenStartSeq", Integer.toString(1));
+ }
+ return newRec;
+
+ }
+
+ private int convertToInt(String number, int defaultValue)
+ {
+ try
+ {
+ if (number != null)
+ {
+ return Integer.parseInt(number);
+ }
+ } catch (Exception e)
+ {
+
+ }
+ return defaultValue;
+ }
+ }
+
+ private static Logger LOG = Logger.getLogger(FileStoreStateModel.class);
+
+ private final String _serverId;
+ private final String _partition;
+
+ private Replicator replicator;
+
+ private ChangeLogGenerator generator;
+
+ private FileSystemWatchService service;
+
+ private InstanceConfig instanceConfig;
+
+ public FileStoreStateModel(HelixManager manager, String resource,
+ String partition)
+ {
+ String clusterName = manager.getClusterName();
+ String instanceName = manager.getInstanceName();
+ instanceConfig = manager.getClusterManagmentTool().getInstanceConfig(
+ clusterName, instanceName);
+ replicator = new Replicator(instanceConfig, resource, partition);
+ try
+ {
+ manager.addExternalViewChangeListener(replicator);
+ } catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ _partition = partition;
+ _serverId = instanceName;
+ }
+
+ /**
+ * If the node is slave, start the rsync thread if it is not started
+ *
+ * @param message
+ * @param context
+ * @throws Exception
+ */
+
+ @Transition(from = "OFFLINE", to = "SLAVE")
+ public void onBecomeSlaveFromOffline(Message message,
+ NotificationContext context) throws Exception
+ {
+ System.out.println(_serverId + " transitioning from " + message.getFromState()
+ + " to " + message.getToState() + " for " + _partition);
+
+ replicator.start();
+ System.out.println(_serverId + " transitioned from " + message.getFromState()
+ + " to " + message.getToState() + " for " + _partition);
+ }
+
+ /**
+ * When the node becomes master, it will start accepting writes and increments
+ * the epoch and starts logging the changes in a file
+ *
+ * @param message
+ * @param context
+ * @throws Exception
+ */
+ @Transition(from = "SLAVE", to = "MASTER")
+ public void onBecomeMasterFromSlave(final Message message,
+ NotificationContext context) throws Exception
+ {
+ replicator.stop();
+ System.out.println(_serverId + " transitioning from " + message.getFromState()
+ + " to " + message.getToState() + " for " + _partition);
+ ZkHelixPropertyStore<ZNRecord> helixPropertyStore = context.getManager()
+ .getHelixPropertyStore();
+ String checkpointDirPath = instanceConfig.getRecord().getSimpleField(
+ "check_point_dir");
+ CheckpointFile checkpointFile = new CheckpointFile(checkpointDirPath);
+ final ChangeRecord lastRecordProcessed = checkpointFile
+ .findLastRecordProcessed();
+ DataUpdater<ZNRecord> updater = new HighWaterMarkUpdater(message,
+ lastRecordProcessed);
+ helixPropertyStore.update(
+ "TRANSACTION_ID_METADATA" + "/" + message.getResourceName(), updater,
+ AccessOption.PERSISTENT);
+ Stat stat = new Stat();
+ ;
+ ZNRecord znRecord = helixPropertyStore.get("TRANSACTION_ID_METADATA" + "/"
+ + message.getResourceName(), stat, AccessOption.PERSISTENT);
+ int startGen = Integer.parseInt(znRecord.getSimpleField("currentGen"));
+ int startSeq = Integer.parseInt(znRecord
+ .getSimpleField("currentGenStartSeq"));
+ String fileStoreDir = instanceConfig.getRecord().getSimpleField("file_store_dir");
+ String changeLogDir = instanceConfig.getRecord().getSimpleField("change_log_dir");
+
+ generator = new ChangeLogGenerator(changeLogDir, startGen, startSeq);
+ //To indicate that we need callbacks for changes that happen starting now
+ long now = System.currentTimeMillis();
+ service = new FileSystemWatchService(fileStoreDir, now, generator);
+ service.start();
+ System.out.println(_serverId + " transitioned from " + message.getFromState()
+ + " to " + message.getToState() + " for " + _partition);
+ }
+
+ /**
+ * Stop writing
+ *
+ * @param message
+ * @param context
+ * @throws Exception
+ */
+
+ @Transition(from = "MASTER", to = "SLAVE")
+ public void onBecomeSlaveFromMaster(Message message,
+ NotificationContext context) throws Exception
+ {
+ service.stop();
+ LOG.info(_serverId + " transitioning from " + message.getFromState()
+ + " to " + message.getToState() + " for " + _partition);
+ replicator.start();
+ }
+
+ @Transition(from = "SLAVE", to = "OFFLINE")
+ public void onBecomeOfflineFromSlave(Message message,
+ NotificationContext context)
+ {
+ replicator.stop();
+ LOG.info(_serverId + " transitioning from " + message.getFromState()
+ + " to " + message.getToState() + " for " + _partition);
+ }
+
+ public void onBecomeDroppedFromOffline(Message message,
+ NotificationContext context)
+ {
+ LOG.info(_serverId + " Dropping partition " + _partition);
+ }
+
+ @Override
+ public void reset()
+ {
+ LOG.warn("Default reset() invoked");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModelFactory.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModelFactory.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModelFactory.java
new file mode 100644
index 0000000..5e36874
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModelFactory.java
@@ -0,0 +1,21 @@
+package org.apache.helix.filestore;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+public class FileStoreStateModelFactory extends StateModelFactory<FileStoreStateModel>
+{
+ private final HelixManager manager;
+ public FileStoreStateModelFactory(HelixManager manager)
+ {
+ this.manager = manager;
+ }
+
+ @Override
+ public FileStoreStateModel createNewStateModel(String partition)
+ {
+ FileStoreStateModel model;
+ model = new FileStoreStateModel(manager,partition.split("_")[0], partition);
+ return model;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileSystemWatchService.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileSystemWatchService.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileSystemWatchService.java
new file mode 100644
index 0000000..c4826ed
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileSystemWatchService.java
@@ -0,0 +1,171 @@
+package org.apache.helix.filestore;
+
+import java.io.File;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.jci.listeners.AbstractFilesystemAlterationListener;
+import org.apache.commons.jci.monitor.FilesystemAlterationMonitor;
+
+public class FileSystemWatchService
+{
+ enum ChangeType
+ {
+ CREATE, DELETE, MODIFY
+ };
+
+ private FilesystemAlterationMonitor fam;
+ private MyFilesystemAlterationListener listener;
+ private Thread thread;
+
+ public FileSystemWatchService(String root, FileChangeWatcher... watchers)
+ {
+ this(root, -1, watchers);
+ }
+
+ public FileSystemWatchService(String root, long startTime,
+ FileChangeWatcher... watchers)
+ {
+ File file = new File(root);
+ System.out.println("Setting up watch service for path:"
+ + file.getAbsolutePath());
+ fam = new FilesystemAlterationMonitor();
+ listener = new MyFilesystemAlterationListener(root, startTime, watchers);
+ fam.addListener(file, listener);
+ }
+
+ public void start()
+ {
+ fam.start();
+ thread = new Thread(listener);
+ thread.start();
+ }
+
+ static class MyFilesystemAlterationListener extends
+ AbstractFilesystemAlterationListener implements Runnable
+ {
+ private final FileChangeWatcher[] watchers;
+ private int length;
+ private final long startTime;
+ private AtomicBoolean stopRequest;
+
+ public MyFilesystemAlterationListener(String root, long startTime,
+ FileChangeWatcher[] watchers)
+ {
+ this.startTime = startTime;
+ File file = new File(root);
+ length = root.length() + 1;
+ this.watchers = watchers;
+ stopRequest = new AtomicBoolean(false);
+
+ }
+
+ @SuppressWarnings("unchecked")
+ public void run()
+ {
+ while (!stopRequest.get())
+ {
+ try
+ {
+ waitForCheck();
+ process(getCreatedDirectories(), watchers, ChangeType.CREATE);
+ process(getDeletedDirectories(), watchers, ChangeType.DELETE);
+ process(getChangedDirectories(), watchers, ChangeType.MODIFY);
+ process(getCreatedFiles(), watchers, ChangeType.CREATE);
+ process(getDeletedFiles(), watchers, ChangeType.DELETE);
+ process(getChangedFiles(), watchers, ChangeType.MODIFY);
+
+ } catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private void process(Collection<File> files, FileChangeWatcher[] watchers,
+ ChangeType type)
+ {
+ if (files.size() > 0)
+ {
+ for (File file : files)
+ {
+ if (file.lastModified() < startTime)
+ {
+ continue;
+ }
+ String path = file.getPath();
+ String relativePath = ".";
+ if (path.length() > length)
+ {
+ relativePath = path.substring(length);
+ }
+ for (FileChangeWatcher watcher : watchers)
+ {
+ switch (type)
+ {
+ case CREATE:
+ watcher.onEntryAdded(relativePath);
+ break;
+ case DELETE:
+ watcher.onEntryDeleted(relativePath);
+ break;
+ case MODIFY:
+ watcher.onEntryModified(relativePath);
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ public void stop()
+ {
+ stopRequest.set(true);
+ }
+ }
+
+ public void stop()
+ {
+ listener.stop();
+ fam.stop();
+ thread.interrupt();
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ FileChangeWatcher defaultWatcher = new FileChangeWatcher()
+ {
+
+ @Override
+ public void onEntryModified(String path)
+ {
+ System.out
+ .println("FileSystemWatchService.main(...).new FileChangeWatcher() {...}.onEntryModified():"
+ + path);
+ }
+
+ @Override
+ public void onEntryDeleted(String path)
+ {
+ System.out
+ .println("FileSystemWatchService.main(...).new FileChangeWatcher() {...}.onEntryDeleted():"
+ + path);
+ }
+
+ @Override
+ public void onEntryAdded(String path)
+ {
+ System.out
+ .println("FileSystemWatchService.main(...).new FileChangeWatcher() {...}.onEntryAdded() : "
+ + path);
+ }
+ };
+ ChangeLogGenerator ChangeLogGenerator = new ChangeLogGenerator(
+ "data/localhost_12000/translog", 1, 1);
+ FileSystemWatchService watchService = new FileSystemWatchService(
+ "data/localhost_12000/filestore", defaultWatcher);
+ watchService.start();
+ Thread.sleep(10000);
+ watchService.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/IntegrationTest.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/IntegrationTest.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/IntegrationTest.java
new file mode 100644
index 0000000..9167549
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/IntegrationTest.java
@@ -0,0 +1,201 @@
+package org.apache.helix.filestore;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.ConfigScope;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.StateModelConfigGenerator;
+
+public class IntegrationTest
+{
+
+ public static void main(String[] args) throws InterruptedException
+ {
+ ZkServer server = null;
+ ;
+
+ try
+ {
+ String baseDir = "/tmp/IntegrationTest/";
+ final String dataDir = baseDir + "zk/dataDir";
+ final String logDir = baseDir + "/tmp/logDir";
+ FileUtils.deleteDirectory(new File(dataDir));
+ FileUtils.deleteDirectory(new File(logDir));
+
+ IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace()
+ {
+ @Override
+ public void createDefaultNameSpace(ZkClient zkClient)
+ {
+
+ }
+ };
+ int zkPort = 2199;
+ final String zkAddress = "localhost:" + zkPort;
+
+ server = new ZkServer(dataDir, logDir, defaultNameSpace, zkPort);
+ server.start();
+ ClusterSetup setup = new ClusterSetup(zkAddress);
+ final String clusterName = "file-store-test";
+ setup.deleteCluster(clusterName);
+ setup.addCluster(clusterName, true);
+ setup.addInstanceToCluster(clusterName, "localhost", 12001);
+ setup.addInstanceToCluster(clusterName, "localhost", 12002);
+ setup.addInstanceToCluster(clusterName, "localhost", 12003);
+ setup.addResourceToCluster(clusterName, "repository", 1, "MasterSlave");
+ setup.reblanceResource(clusterName, "repository", 3);
+ // Set the configuration
+ final String instanceName1 = "localhost_12001";
+ addConfiguration(setup, baseDir, clusterName, instanceName1);
+ final String instanceName2 = "localhost_12002";
+ addConfiguration(setup, baseDir, clusterName, instanceName2);
+ final String instanceName3 = "localhost_12003";
+ addConfiguration(setup, baseDir, clusterName, instanceName3);
+ Thread thread1 = new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ FileStore fileStore = null;
+
+ try
+ {
+ fileStore = new FileStore(zkAddress, clusterName, instanceName1);
+ fileStore.connect();
+ } catch (Exception e)
+ {
+ System.err.println("Exception" + e);
+ fileStore.disconnect();
+ }
+ }
+
+ });
+ // START NODES
+ Thread thread2 = new Thread(new Runnable()
+ {
+
+ @Override
+ public void run()
+ {
+ FileStore fileStore = new FileStore(zkAddress, clusterName,
+ instanceName2);
+ fileStore.connect();
+ }
+ });
+ // START NODES
+ Thread thread3 = new Thread(new Runnable()
+ {
+
+ @Override
+ public void run()
+ {
+ FileStore fileStore = new FileStore(zkAddress, clusterName,
+ instanceName3);
+ fileStore.connect();
+ }
+ });
+ System.out.println("STARTING NODES");
+ thread1.start();
+ thread2.start();
+ thread3.start();
+
+ // Start Controller
+ final HelixManager manager = HelixControllerMain.startHelixController(
+ zkAddress, clusterName, "controller", HelixControllerMain.STANDALONE);
+ Thread.sleep(5000);
+ printStatus(manager);
+ listFiles(baseDir);
+ System.out.println("Writing files a.txt and b.txt to current master "
+ + baseDir + "localhost_12001" + "/filestore");
+ FileUtils.writeStringToFile(new File(baseDir + "localhost_12001"
+ + "/filestore/a.txt"), "some_data in a");
+ FileUtils.writeStringToFile(new File(baseDir + "localhost_12001"
+ + "/filestore/b.txt"), "some_data in b");
+ Thread.sleep(10000);
+ listFiles(baseDir);
+ Thread.sleep(5000);
+ System.out.println("Stopping the MASTER node:" + "localhost_12001");
+ thread1.interrupt();
+ Thread.sleep(10000);
+ printStatus(manager);
+ System.out.println("Writing files c.txt and d.txt to current master "
+ + baseDir + "localhost_12002" + "/filestore");
+ FileUtils.writeStringToFile(new File(baseDir + "localhost_12002"
+ + "/filestore/c.txt"), "some_data in c");
+ FileUtils.writeStringToFile(new File(baseDir + "localhost_12002"
+ + "/filestore/d.txt"), "some_data in d");
+ listFiles(baseDir);
+ System.out.println("Create or modify any files under " + baseDir
+ + "localhost_12002" + "/filestore"
+ + " and it should get replicated to " + baseDir + "localhost_12003"
+ + "/filestore");
+ } catch (Exception e)
+ {
+ e.printStackTrace();
+ } finally
+ {
+ if (server != null)
+ {
+ // server.shutdown();
+ }
+ }
+ Thread.currentThread().join();
+ }
+
+ private static void listFiles(String baseDir)
+ {
+ System.out.println("===============FILES===============================");
+ String[] instances = new String[] { "localhost_12001", "localhost_12002",
+ "localhost_12003" };
+ for (String instance : instances)
+ {
+ String dir = baseDir + instance + "/filestore";
+ String[] list = new File(dir).list();
+ System.out.println(dir + ":"
+ + ((list != null) ? Arrays.toString(list) : "NONE"));
+ }
+ System.out.println("===============FILES===============================");
+ }
+
+ private static void printStatus(final HelixManager manager)
+ {
+ System.out.println("CLUSTER STATUS");
+ HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = helixDataAccessor.keyBuilder();
+ System.out.println("External View \n"
+ + helixDataAccessor.getProperty(keyBuilder.externalView("repository")));
+ }
+
+ private static void addConfiguration(ClusterSetup setup, String baseDir,
+ String clusterName, String instanceName) throws IOException
+ {
+ Map<String, String> properties = new HashMap<String, String>();
+ ConfigScopeBuilder builder = new ConfigScopeBuilder();
+ ConfigScope instanceScope = builder.forCluster(clusterName)
+ .forParticipant(instanceName).build();
+ properties.put("change_log_dir", baseDir + instanceName + "/translog");
+ properties.put("file_store_dir", baseDir + instanceName + "/filestore");
+ properties.put("check_point_dir", baseDir + instanceName + "/checkpoint");
+ setup.getClusterManagementTool().setConfig(instanceScope, properties);
+ FileUtils.deleteDirectory(new File(properties.get("change_log_dir")));
+ FileUtils.deleteDirectory(new File(properties.get("file_store_dir")));
+ FileUtils.deleteDirectory(new File(properties.get("check_point_dir")));
+ new File(properties.get("change_log_dir")).mkdirs();
+ new File(properties.get("file_store_dir")).mkdirs();
+ new File(properties.get("check_point_dir")).mkdirs();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/Replicator.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/Replicator.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/Replicator.java
new file mode 100644
index 0000000..dddf9c9
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/Replicator.java
@@ -0,0 +1,166 @@
+package org.apache.helix.filestore;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.spectator.RoutingTableProvider;
+
+public class Replicator extends RoutingTableProvider
+{
+ private InstanceConfig currentMasterConfig;
+ private final InstanceConfig localInstanceConfig;
+
+ private final String partition;
+ private final String resourceName;
+ AtomicBoolean isReplicationInitiated;
+ AtomicBoolean isReplicationStarted;
+ RsyncInvoker rsyncInvoker;
+ private ChangeLogProcessor processor;
+ private FileSystemWatchService watchService;
+ private ChangeLogReader reader;
+
+ public void setRsyncInvoker(RsyncInvoker rsyncInvoker)
+ {
+ this.rsyncInvoker = rsyncInvoker;
+ }
+
+ public Replicator(InstanceConfig localInstanceConfig, String resourceName,
+ String partition)
+ {
+ this.localInstanceConfig = localInstanceConfig;
+ this.resourceName = resourceName;
+ this.partition = partition;
+ isReplicationInitiated = new AtomicBoolean(false);
+ isReplicationStarted = new AtomicBoolean(false);
+
+ }
+
+ public void start() throws Exception
+ {
+//System.out.println("Starting replication for ");
+ isReplicationInitiated.set(true);
+
+ List<InstanceConfig> instances = getInstances(resourceName, partition,
+ "MASTER");
+ if (instances.size() > 0)
+ {
+ if (instances.size() == 1)
+ {
+ InstanceConfig newMasterConfig = instances.get(0);
+ String master = newMasterConfig.getInstanceName();
+ if (currentMasterConfig == null
+ || !master.equalsIgnoreCase(currentMasterConfig.getInstanceName()))
+ {
+ System.out.println("Found new master:"
+ + newMasterConfig.getInstanceName());
+ if(currentMasterConfig!=null){
+ stop();
+ }
+ currentMasterConfig = newMasterConfig;
+ startReplication(currentMasterConfig);
+ } else
+ {
+ System.out.println("Already replicating from " + master);
+ }
+ } else
+ {
+ System.out.println("Invalid number of masters found:" + instances);
+ }
+ } else
+ {
+ System.out.println("No master found");
+ }
+ }
+
+ public void startReplication(InstanceConfig masterInstanceConfig)
+ throws Exception
+ {
+ String remoteHost = masterInstanceConfig.getHostName();
+ String remoteChangeLogDir = masterInstanceConfig.getRecord()
+ .getSimpleField("change_log_dir");
+ String remoteFilestoreDir = masterInstanceConfig.getRecord()
+ .getSimpleField("file_store_dir");
+
+ String localChangeLogDir = localInstanceConfig.getRecord().getSimpleField(
+ "change_log_dir");
+ String localFilestoreDir = localInstanceConfig.getRecord().getSimpleField(
+ "file_store_dir");
+ String localcheckpointDir = localInstanceConfig.getRecord().getSimpleField(
+ "check_point_dir");
+ // setup rsync for the change log directory
+ setupRsync(remoteHost, remoteChangeLogDir, localChangeLogDir);
+ reader = new ChangeLogReader(localChangeLogDir);
+ watchService = new FileSystemWatchService(localChangeLogDir, reader);
+ processor = new ChangeLogProcessor(reader, remoteHost, remoteFilestoreDir,
+ localFilestoreDir, localcheckpointDir);
+ watchService.start();
+ processor.start();
+ isReplicationStarted.set(true);
+ }
+
+ private void setupRsync(String remoteHost, String remoteBaseDir,
+ String localBaseDir) throws Exception
+ {
+ rsyncInvoker = new RsyncInvoker(remoteHost, remoteBaseDir, localBaseDir);
+ boolean started = rsyncInvoker.runInBackground();
+ if (started)
+ {
+ System.out.println("Rsync thread started in background");
+ } else
+ {
+ throw new Exception("Unable to start rsync thread");
+ }
+ }
+
+ public void stop()
+ {
+ if (isReplicationStarted.get())
+ {
+ System.out.println("Stopping replication from current master:"+ currentMasterConfig.getInstanceName());
+ rsyncInvoker.stop();
+ watchService.stop();
+ processor.stop();
+ }
+ isReplicationInitiated.set(false);
+ }
+
+ @Override
+ public void onExternalViewChange(List<ExternalView> viewList,
+ NotificationContext context)
+ {
+ super.onExternalViewChange(viewList, context);
+
+ if (isReplicationInitiated.get())
+ {
+ try
+ {
+ start();
+ } catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ InstanceConfig localInstanceConfig = new InstanceConfig("localhost_12001");
+ ZNRecord record = localInstanceConfig.getRecord();
+ record.setSimpleField("change_log_dir", "data/localhost_12001/translog");
+ record.setSimpleField("file_store_dir", "data/localhost_12001/filestore");
+ record.setSimpleField("check_point_dir", "data/localhost_12001/checkpoint");
+ InstanceConfig masterInstanceConfig = new InstanceConfig("localhost_12001");
+ record = masterInstanceConfig.getRecord();
+ record.setSimpleField("change_log_dir", "data/localhost_12000/translog");
+ record.setSimpleField("file_store_dir", "data/localhost_12000/filestore");
+ record.setSimpleField("check_point_dir", "data/localhost_12000/checkpoint");
+ Replicator replicator = new Replicator(localInstanceConfig, "resource",
+ "partition");
+ replicator.startReplication(masterInstanceConfig);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/RsyncDaemon.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/RsyncDaemon.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/RsyncDaemon.java
new file mode 100644
index 0000000..45f1763
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/RsyncDaemon.java
@@ -0,0 +1,29 @@
+package org.apache.helix.filestore;
+
+
+public class RsyncDaemon
+{
+ public boolean start()
+ {
+ ProcessBuilder pb = new ProcessBuilder("sudo", "rync", "--daemon");
+ ExternalCommand externalCommand = new ExternalCommand(pb);
+ try
+ {
+ int exitVal = externalCommand.waitFor();
+ if(exitVal!=0){
+ return true;
+ }
+ } catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ return false;
+ }
+
+
+
+ public boolean stop()
+ {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/RsyncInvoker.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/RsyncInvoker.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/RsyncInvoker.java
new file mode 100644
index 0000000..2c94c5e
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/RsyncInvoker.java
@@ -0,0 +1,114 @@
+package org.apache.helix.filestore;
+
+import java.io.IOException;
+
+public class RsyncInvoker
+{
+ private Thread backgroundThread;
+ private final String remoteHost;
+ private final String remoteLogDir;
+ private final String localLogDir;
+
+ public RsyncInvoker(String remoteHost, String remoteBaseDir,
+ String localBaseDir)
+ {
+ this.remoteHost = remoteHost;
+ this.remoteLogDir = remoteBaseDir;
+ this.localLogDir = localBaseDir;
+ }
+
+ public boolean rsync(String relativePath)
+ {
+ int exitVal = -1;
+ try
+ {
+ ProcessBuilder pb = new ProcessBuilder( "rsync",
+ remoteLogDir+"/"+ relativePath , localLogDir);
+ System.out.println("Rsyncing source:"+ remoteLogDir+"/"+ relativePath + " dest:"+ localLogDir);
+ ExternalCommand ec = new ExternalCommand(pb);
+ ec.start();
+ exitVal = ec.waitFor();
+ if (exitVal != 0)
+ {
+ System.out.println("Failed to rsync "+ ec.getStringError());
+ } else
+ {
+ return true;
+ }
+ } catch (IOException e)
+ {
+ e.printStackTrace();
+ } catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+
+ return false;
+ }
+ public boolean stop(){
+ if( backgroundThread!=null){
+ backgroundThread.interrupt();
+ }
+ return true;
+ }
+ public boolean runInBackground()
+ {
+ backgroundThread = new Thread(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ int sleep = 1000;
+ while (true)
+ {
+ int exitVal = -1;
+ try
+ {
+ Thread.sleep(sleep);
+ ProcessBuilder pb = new ProcessBuilder( "rsync", "-rvt",
+ remoteLogDir+"/", localLogDir);
+ //System.out.println("Background rsync source:"+remoteLogDir+"/" +" dest:"+ localLogDir);
+ ExternalCommand ec;
+ ec = new ExternalCommand(pb);
+ ec.start();
+ exitVal = ec.waitFor();
+ String stringError = ec.getStringError();
+ if(stringError!=null && stringError.length()>0)
+ System.err.println(stringError);
+ //System.out.println(ec.getStringOutput());
+
+ } catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+ if (exitVal != 0)
+ {
+
+ sleep = Math.min(2 * sleep, 2 * 60 * 1000);
+ System.out.println("Failed to rsync retrying in " + sleep / 1000
+ + " seconds");
+ } else
+ {
+ sleep = 1000;
+ }
+ }
+ } catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ Thread.currentThread().interrupt();
+ }
+ }
+ });
+ backgroundThread.start();
+ return true;
+ }
+ public static void main(String[] args)
+ {
+ String remoteHost= "localhost";
+ String remoteLogDir="data/localhost_12000/translog";
+ String localLogDir="data/localhost_12001/translog";
+ RsyncInvoker rsyncInvoker = new RsyncInvoker(remoteHost, remoteLogDir, localLogDir);
+ rsyncInvoker.runInBackground();
+ }
+}