You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/12/07 10:22:43 UTC

[1/2] git commit: [HELIX-13] Initial commit for near real time rsync replicated file system

Updated Branches:
  refs/heads/master 4b0688333 -> 342ea1c27


[HELIX-13] Initial commit for near real time rsync replicated file system


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/342ea1c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/342ea1c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/342ea1c2

Branch: refs/heads/master
Commit: 342ea1c277651a6e3cf85b10949c2953d93618dd
Parents: 4b06883
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Fri Dec 7 01:21:09 2012 -0800
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Fri Dec 7 01:22:25 2012 -0800

----------------------------------------------------------------------
 recipes/rsync-replicated-file-system/README.md     |    1 +
 recipes/rsync-replicated-file-system/bin/log4j.xml |   19 +
 .../bin/send-message.sh                            |    8 +
 .../bin/setup-cluster.sh                           |    8 +
 .../bin/start-cluster-manager.sh                   |    8 +
 .../bin/start-consumer.sh                          |    8 +
 .../data/localhost_12000/translog/log.0            |  Bin 0 -> 126 bytes
 .../checkpoint/lastprocessedchange.checkpoint      |    1 +
 .../checkpoint/lastprocessedchange.checkpoint.bak  |    1 +
 .../data/localhost_12001/translog/log.0            |  Bin 0 -> 126 bytes
 recipes/rsync-replicated-file-system/pom.xml       |  109 ++++
 .../java/org/apache/helix/filestore/ChangeLog.java |   13 +
 .../apache/helix/filestore/ChangeLogGenerator.java |  130 +++++
 .../apache/helix/filestore/ChangeLogProcessor.java |   93 ++++
 .../apache/helix/filestore/ChangeLogReader.java    |  150 ++++++
 .../org/apache/helix/filestore/ChangeRecord.java   |   70 +++
 .../org/apache/helix/filestore/CheckpointFile.java |   76 +++
 .../apache/helix/filestore/ExternalCommand.java    |  401 +++++++++++++++
 .../apache/helix/filestore/FileChangeWatcher.java  |   11 +
 .../java/org/apache/helix/filestore/FileStore.java |  124 +++++
 .../helix/filestore/FileStoreStateModel.java       |  221 ++++++++
 .../filestore/FileStoreStateModelFactory.java      |   21 +
 .../helix/filestore/FileSystemWatchService.java    |  171 ++++++
 .../apache/helix/filestore/IntegrationTest.java    |  201 ++++++++
 .../org/apache/helix/filestore/Replicator.java     |  166 ++++++
 .../org/apache/helix/filestore/RsyncDaemon.java    |   29 +
 .../org/apache/helix/filestore/RsyncInvoker.java   |  114 ++++
 .../org/apache/helix/filestore/SetupCluster.java   |   58 ++
 .../helix/filestore/StartClusterManager.java       |   42 ++
 .../main/java/org/apache/helix/filestore/Test.java |  143 +++++
 30 files changed, 2397 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/README.md
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/README.md b/recipes/rsync-replicated-file-system/README.md
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/README.md
@@ -0,0 +1 @@
+

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/bin/log4j.xml
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/bin/log4j.xml b/recipes/rsync-replicated-file-system/bin/log4j.xml
new file mode 100644
index 0000000..09ed7cd
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/bin/log4j.xml
@@ -0,0 +1,19 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+  
+  <appender name="ConsoleAppender" class="org.apache.log4j.ConsoleAppender">
+    <param name="Target" value="System.err" />
+    <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern" value="%d{yyyy/MM/dd HH:mm:ss.SSS} %p [%c{1}] [perf-test] %m%n" />
+    </layout>
+  </appender>
+	  
+  <root>
+    <priority value="debug"/>
+    <appender-ref ref="ConsoleAppender"/>
+  </root>
+  
+</log4j:configuration>
+

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/bin/send-message.sh
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/bin/send-message.sh b/recipes/rsync-replicated-file-system/bin/send-message.sh
new file mode 100755
index 0000000..89ee65a
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/bin/send-message.sh
@@ -0,0 +1,8 @@
+#!/bin/bash
+
+script_dir=`dirname $0`
+LIB=$script_dir/../lib
+CLASSPATH=$script_dir/../target/classes:"$LIB"/helix-core-0.1-SNAPSHOT-incubating.jar:"$LIB"/rabbitmq-client.jar:"$LIB"/commons-cli-1.1.jar:"$LIB"/commons-io-1.2.jar:"$LIB"/commons-math-2.1.jar:"$LIB"/jackson-core-asl-1.8.5.jar:"$LIB"/jackson-mapper-asl-1.8.5.jar:"$LIB"/log4j-1.2.15.jar:"$LIB"/org.restlet-1.1.10.jar:"$LIB"/zkclient-0.1.jar:"$LIB"/zookeeper-3.3.4.jar
+# echo $CLASSPATH
+
+java -cp "$CLASSPATH" org.apache.helix.filestore.Emitter $@

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/bin/setup-cluster.sh
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/bin/setup-cluster.sh b/recipes/rsync-replicated-file-system/bin/setup-cluster.sh
new file mode 100755
index 0000000..b924182
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/bin/setup-cluster.sh
@@ -0,0 +1,8 @@
+#!/bin/bash
+
+script_dir=`dirname $0`
+LIB=$script_dir/../lib
+CLASSPATH=$script_dir/../target/classes:"$LIB"/helix-core-0.1-SNAPSHOT-incubating.jar:"$LIB"/rabbitmq-client.jar:"$LIB"/commons-cli-1.1.jar:"$LIB"/commons-io-1.2.jar:"$LIB"/commons-math-2.1.jar:"$LIB"/jackson-core-asl-1.8.5.jar:"$LIB"/jackson-mapper-asl-1.8.5.jar:"$LIB"/log4j-1.2.15.jar:"$LIB"/org.restlet-1.1.10.jar:"$LIB"/zkclient-0.1.jar:"$LIB"/zookeeper-3.3.4.jar
+# echo $CLASSPATH
+
+java -cp "$CLASSPATH" org.apache.helix.filestore.SetupConsumerCluster $@

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/bin/start-cluster-manager.sh
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/bin/start-cluster-manager.sh b/recipes/rsync-replicated-file-system/bin/start-cluster-manager.sh
new file mode 100755
index 0000000..e8e0698
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/bin/start-cluster-manager.sh
@@ -0,0 +1,8 @@
+#!/bin/bash
+
+script_dir=`dirname $0`
+LIB=$script_dir/../lib
+CLASSPATH=$script_dir/../target/classes:"$LIB"/helix-core-0.1-SNAPSHOT-incubating.jar:"$LIB"/rabbitmq-client.jar:"$LIB"/commons-cli-1.1.jar:"$LIB"/commons-io-1.2.jar:"$LIB"/commons-math-2.1.jar:"$LIB"/jackson-core-asl-1.8.5.jar:"$LIB"/jackson-mapper-asl-1.8.5.jar:"$LIB"/log4j-1.2.15.jar:"$LIB"/org.restlet-1.1.10.jar:"$LIB"/zkclient-0.1.jar:"$LIB"/zookeeper-3.3.4.jar
+# echo $CLASSPATH
+
+java -cp "$CLASSPATH" org.apache.helix.filestore.StartClusterManager $@

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/bin/start-consumer.sh
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/bin/start-consumer.sh b/recipes/rsync-replicated-file-system/bin/start-consumer.sh
new file mode 100755
index 0000000..01b2ac6
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/bin/start-consumer.sh
@@ -0,0 +1,8 @@
+#!/bin/bash
+
+script_dir=`dirname $0`
+LIB=$script_dir/../lib
+CLASSPATH=$script_dir/../target/classes:"$LIB"/helix-core-0.1-SNAPSHOT-incubating.jar:"$LIB"/rabbitmq-client.jar:"$LIB"/commons-cli-1.1.jar:"$LIB"/commons-io-1.2.jar:"$LIB"/commons-math-2.1.jar:"$LIB"/jackson-core-asl-1.8.5.jar:"$LIB"/jackson-mapper-asl-1.8.5.jar:"$LIB"/log4j-1.2.15.jar:"$LIB"/org.restlet-1.1.10.jar:"$LIB"/zkclient-0.1.jar:"$LIB"/zookeeper-3.3.4.jar
+# echo $CLASSPATH
+
+java -cp "$CLASSPATH" org.apache.helix.filestore.FileStore $@

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/a
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/a b/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/a
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/b
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/b b/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/b
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/c
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/c b/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/c
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/d
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/d b/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/d
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/e
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/e b/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/e
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/f
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/f b/recipes/rsync-replicated-file-system/data/localhost_12000/filestore/f
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/data/localhost_12000/translog/log.0
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/data/localhost_12000/translog/log.0 b/recipes/rsync-replicated-file-system/data/localhost_12000/translog/log.0
new file mode 100644
index 0000000..d73406d
Binary files /dev/null and b/recipes/rsync-replicated-file-system/data/localhost_12000/translog/log.0 differ

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/data/localhost_12000/translog/log.1
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/data/localhost_12000/translog/log.1 b/recipes/rsync-replicated-file-system/data/localhost_12000/translog/log.1
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/data/localhost_12001/checkpoint/lastprocessedchange.checkpoint
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/data/localhost_12001/checkpoint/lastprocessedchange.checkpoint b/recipes/rsync-replicated-file-system/data/localhost_12001/checkpoint/lastprocessedchange.checkpoint
new file mode 100644
index 0000000..77320cb
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/data/localhost_12001/checkpoint/lastprocessedchange.checkpoint
@@ -0,0 +1 @@
+2147483655|1354784309854|f|log.0|105|126
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/data/localhost_12001/checkpoint/lastprocessedchange.checkpoint.bak
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/data/localhost_12001/checkpoint/lastprocessedchange.checkpoint.bak b/recipes/rsync-replicated-file-system/data/localhost_12001/checkpoint/lastprocessedchange.checkpoint.bak
new file mode 100644
index 0000000..9f9165f
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/data/localhost_12001/checkpoint/lastprocessedchange.checkpoint.bak
@@ -0,0 +1 @@
+2147483654|1354784147820|e|log.0|84|105
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/a
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/a b/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/a
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/b
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/b b/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/b
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/c
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/c b/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/c
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/d
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/d b/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/d
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/e
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/e b/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/e
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/f
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/f b/recipes/rsync-replicated-file-system/data/localhost_12001/filestore/f
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/data/localhost_12001/translog/log.0
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/data/localhost_12001/translog/log.0 b/recipes/rsync-replicated-file-system/data/localhost_12001/translog/log.0
new file mode 100644
index 0000000..826dd1c
Binary files /dev/null and b/recipes/rsync-replicated-file-system/data/localhost_12001/translog/log.0 differ

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/pom.xml
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/pom.xml b/recipes/rsync-replicated-file-system/pom.xml
new file mode 100644
index 0000000..26b52f9
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/pom.xml
@@ -0,0 +1,109 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<groupId>com.linkedin.helix</groupId>
+	<artifactId>rsync-replicated-file-system</artifactId>
+	<packaging>jar</packaging>
+	<version>1.0-SNAPSHOT</version>
+	<name>rsync-replicated-file-system</name>
+
+	<dependencies>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>3.8.1</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.helix</groupId>
+			<artifactId>helix-core</artifactId>
+			<version>0.6-incubating-SNAPSHOT</version>
+		</dependency>
+		<dependency>
+			<groupId>log4j</groupId>
+			<artifactId>log4j</artifactId>
+			<version>1.2.15</version>
+			<exclusions>
+				<exclusion>
+					<groupId>javax.mail</groupId>
+					<artifactId>mail</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>javax.jms</groupId>
+					<artifactId>jms</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jdmk</groupId>
+					<artifactId>jmxtools</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jmx</groupId>
+					<artifactId>jmxri</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.commons</groupId>
+			<artifactId>commons-jci-fam</artifactId>
+			<version>1.0</version>
+		</dependency>
+		<dependency>
+			<groupId>com.github.sgroschupf</groupId>
+			<artifactId>zkclient</artifactId>
+			<version>0.1</version>
+		</dependency>
+	</dependencies>
+	<build>
+		<pluginManagement>
+			<plugins>
+				<plugin>
+					<groupId>org.codehaus.mojo</groupId>
+					<artifactId>appassembler-maven-plugin</artifactId>
+					<version>1.1.1</version>
+					<configuration>
+						<!-- Set the target configuration directory to be used in the bin scripts -->
+						<!-- <configurationDirectory>conf</configurationDirectory> -->
+						<!-- Copy the contents from "/src/main/config" to the target configuration 
+							directory in the assembled application -->
+					<!-- <copyConfigurationDirectory>true</copyConfigurationDirectory> -->
+						<!-- Include the target configuration directory in the beginning of 
+							the classpath declaration in the bin scripts -->
+						<includeConfigurationDirectoryInClasspath>true</includeConfigurationDirectoryInClasspath>
+						<assembleDirectory>${project.build.directory}/${project.artifactId}-pkg</assembleDirectory>
+						<!-- Extra JVM arguments that will be included in the bin scripts -->
+						<extraJvmArguments>-Xms512m -Xmx512m</extraJvmArguments>
+						<!-- Generate bin scripts for windows and unix pr default -->
+						<platforms>
+							<platform>windows</platform>
+							<platform>unix</platform>
+						</platforms>
+					</configuration>
+					<executions>
+						<execution>
+							<phase>package</phase>
+							<goals>
+								<goal>assemble</goal>
+							</goals>
+						</execution>
+					</executions>
+				</plugin>
+			</plugins>
+		</pluginManagement>
+		<plugins>
+
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>appassembler-maven-plugin</artifactId>
+				<configuration>
+					<programs>
+						<program>
+							<mainClass>org.apache.helix.filestore.IntegrationTest</mainClass>
+							<name>quickdemo</name>
+						</program>
+
+					</programs>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeLog.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeLog.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeLog.java
new file mode 100644
index 0000000..1f41a66
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeLog.java
@@ -0,0 +1,13 @@
+package org.apache.helix.filestore;
+
+public class ChangeLog
+{
+  public ChangeLog(String logDir){
+    
+  }
+  
+  public void logChange(String file)
+  {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeLogGenerator.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeLogGenerator.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeLogGenerator.java
new file mode 100644
index 0000000..28a7e97
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeLogGenerator.java
@@ -0,0 +1,130 @@
+package org.apache.helix.filestore;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.nio.charset.Charset;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.helix.filestore.FileSystemWatchService.ChangeType;
+
+public class ChangeLogGenerator implements FileChangeWatcher
+{
+  Lock lock;
+  private long currentSeq;
+  private long currentGen;
+  private int entriesLogged;
+  private DataOutputStream out;
+  private final String directory;
+
+  public ChangeLogGenerator(String directory, long startGen, long startSeq)
+      throws Exception
+  {
+    this.directory = directory;
+    lock = new ReentrantLock();
+    currentSeq = startSeq;
+    currentGen = startGen;
+    setLogFile();
+  }
+
+  private void setLogFile() throws Exception
+  {
+    File file = new File(directory);
+    String[] list = file.list();
+    if(list==null){
+      list = new String[]{};
+    }
+    int max = 1;
+    for (String name : list)
+    {
+      String[] split = name.split(".");
+      if (split.length == 2)
+      {
+        try
+        {
+          int index = Integer.parseInt(split[1]);
+          if (index > max)
+          {
+            max = index;
+          }
+        } catch (NumberFormatException e)
+        {
+          System.err.println("Invalid transaction log file found:" + name);
+        }
+      }
+    }
+    
+    String transLogFile = directory+"/"+
+        "log." + (max);
+    System.out.println("Current file name:"+ transLogFile);
+    out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(transLogFile,true)));
+  }
+
+  @Override
+  public void onEntryAdded(String path)
+  {
+    appendChange(path, FileSystemWatchService.ChangeType.CREATE);
+
+  }
+
+  @Override
+  public void onEntryDeleted(String path)
+  {
+    appendChange(path, FileSystemWatchService.ChangeType.DELETE);
+
+  }
+
+  @Override
+  public void onEntryModified(String path)
+  {
+
+    appendChange(path, FileSystemWatchService.ChangeType.MODIFY);
+
+  }
+
+  public boolean appendChange(String path, ChangeType type)
+  {
+    lock.lock();
+    if(new File(path).isDirectory()){
+      return true;
+    }
+    try
+    {
+      ChangeRecord record = new ChangeRecord();
+      record.file = path;
+      record.timestamp = System.currentTimeMillis();
+      currentSeq++;
+      long txnId = (((long)currentGen) << 32) + ((long)currentSeq);
+      record.txid = txnId;
+      record.type = (short) type.ordinal();
+      write(record);
+    } catch (Exception e)
+    {
+      e.printStackTrace();
+      return false;
+    } finally
+    {
+      lock.unlock();
+    }
+    return true;
+  }
+
+  private void write(ChangeRecord record) throws Exception
+  {
+    out.writeLong(record.txid);
+    out.writeShort(record.type);
+    out.writeLong(record.timestamp);
+    out.writeUTF(record.file);
+    out.flush();
+    entriesLogged++;
+    if(entriesLogged==10000){
+      entriesLogged=0;
+      out.close();
+      setLogFile();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeLogProcessor.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeLogProcessor.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeLogProcessor.java
new file mode 100644
index 0000000..17ad0d5
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeLogProcessor.java
@@ -0,0 +1,93 @@
+package org.apache.helix.filestore;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.io.FileUtils;
+
+/**
+ * Processes the change log and invokes rsync for every change on the remote
+ * machine
+ * 
+ * @author kgopalak
+ * 
+ */
+public class ChangeLogProcessor implements Runnable
+{
+  private final ChangeLogReader reader;
+  RsyncInvoker rsyncInvoker;
+  private AtomicBoolean shutdownRequested;
+  private CheckpointFile checkpointFile;
+  private Thread thread;
+
+  public ChangeLogProcessor(ChangeLogReader reader, String remoteHost,
+      String remoteBaseDir, String localBaseDir, String checkpointDirPath)
+      throws Exception
+  {
+    this.reader = reader;
+    checkpointFile = new CheckpointFile(checkpointDirPath);
+
+    shutdownRequested = new AtomicBoolean(false);
+    rsyncInvoker = new RsyncInvoker(remoteHost, remoteBaseDir, localBaseDir);
+  }
+
+  public void start()
+  {
+    thread = new Thread(this);
+    thread.start();
+  }
+
+  public void run()
+  {
+    try
+    {
+      ChangeRecord lastRecordProcessed = checkpointFile.findLastRecordProcessed();
+      do
+      {
+        try
+        {
+          List<ChangeRecord> changes = reader
+              .getChangeSince(lastRecordProcessed);
+          Set<String> paths = getRemotePathsToSync(changes);
+          for (String path : paths)
+          {
+            rsyncInvoker.rsync(path);
+          }
+          lastRecordProcessed = changes.get(changes.size() - 1);
+          checkpointFile.checkpoint(lastRecordProcessed);
+        } catch (Exception e)
+        {
+          e.printStackTrace();
+        }
+      } while (!shutdownRequested.get());
+    } catch (Exception e)
+    {
+      e.printStackTrace();
+    }
+  }
+
+
+
+  private Set<String> getRemotePathsToSync(List<ChangeRecord> changes)
+  {
+    Set<String> paths = new TreeSet<String>();
+    for (ChangeRecord change : changes)
+    {
+      paths.add(change.file);
+    }
+    return paths;
+  }
+
+  public void stop()
+  {
+    shutdownRequested.set(true);
+    thread.interrupt();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeLogReader.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeLogReader.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeLogReader.java
new file mode 100644
index 0000000..30198e8
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeLogReader.java
@@ -0,0 +1,150 @@
+package org.apache.helix.filestore;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class ChangeLogReader implements FileChangeWatcher
+{
+  int MAX_ENTRIES_TO_READ = 100;
+  private final String changeLogDir;
+  Lock lock;
+  private Condition condition;
+
+  public ChangeLogReader(String changeLogDir)
+  {
+    this.changeLogDir = changeLogDir;
+    lock = new ReentrantLock();
+    condition = lock.newCondition();
+
+  }
+
+  /**
+   * Blocking call
+   * 
+   * @param record
+   * @return
+   */
+  public List<ChangeRecord> getChangeSince(ChangeRecord record)
+  {
+    List<ChangeRecord> changes = new ArrayList<ChangeRecord>();
+    String fileName;
+    long endOffset;
+    if (record == null)
+    {
+      fileName = "log.1";
+      endOffset = 0;
+    } else
+    {
+      fileName = record.changeLogFileName;
+      endOffset = record.endOffset;
+    }
+    try
+    {
+      lock.lock();
+      
+      File file;
+      file = new File(changeLogDir + "/" + fileName);
+      while (!file.exists() || file.length() <= endOffset)
+      {
+     // wait
+        try
+        {
+          System.out.println("Waiting for new changes");
+          condition.await();
+          System.out.println("Detected changes");
+        } catch (InterruptedException e)
+        {
+          e.printStackTrace();
+        }
+      }
+      RandomAccessFile raf = new RandomAccessFile(
+          changeLogDir + "/" + fileName, "r");
+      raf.seek(endOffset);
+      // out.writeLong(record.txid);
+      // out.writeShort(record.type);
+      // out.writeLong(record.timestamp);
+      // out.writeUTF(record.file);
+
+      int count = 0;
+     do {
+        ChangeRecord newRecord = new ChangeRecord();
+        newRecord.changeLogFileName = fileName;
+        newRecord.startOffset = raf.getFilePointer();
+        newRecord.txid = raf.readLong();
+        newRecord.type = raf.readShort();
+        newRecord.timestamp = raf.readLong();
+        newRecord.file = raf.readUTF();
+        newRecord.endOffset = raf.getFilePointer();
+        changes.add(newRecord);
+        count++;
+      }while (count < MAX_ENTRIES_TO_READ && raf.getFilePointer()< raf.length());
+    } catch (FileNotFoundException e)
+    {
+      e.printStackTrace();
+    } catch (IOException e)
+    {
+      e.printStackTrace();
+    } finally
+    {
+      lock.unlock();
+    }
+    return changes;
+  }
+
+  @Override
+  public void onEntryModified(String path)
+  {
+    try
+    {
+      lock.lock();
+      condition.signalAll();
+    } catch (Exception e)
+    {
+      // TODO: handle exception
+    } finally
+    {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public void onEntryAdded(String path)
+  {
+    try
+    {
+      lock.lock();
+      condition.signalAll();
+    } catch (Exception e)
+    {
+      // TODO: handle exception
+    } finally
+    {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public void onEntryDeleted(String path)
+  {
+    try
+    {
+      lock.lock();
+      condition.signalAll();
+    } catch (Exception e)
+    {
+      // TODO: handle exception
+    } finally
+    {
+      lock.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeRecord.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeRecord.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeRecord.java
new file mode 100644
index 0000000..567254b
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ChangeRecord.java
@@ -0,0 +1,70 @@
+package org.apache.helix.filestore;
+
+public class ChangeRecord
+{
+  /**
+   * Transaction Id corresponding to the change that increases monotonically. 31
+   * LSB correspond to sequence number that increments every change. 31 MSB
+   * increments when the master changes
+   */
+  long txid;
+  /**
+   * File(s) that were changed
+   */
+  String file;
+
+  /**
+   * Timestamp
+   */
+  long timestamp;
+  /**
+   * Type of event like create, modified, deleted
+   */
+  short type;
+
+  transient String changeLogFileName;
+
+  transient long startOffset;
+
+  transient long endOffset;
+  
+  public short fileFieldLength;
+
+  public String toString()
+  {
+    StringBuilder sb = new StringBuilder();
+    sb.append(txid);
+    sb.append("|");
+    sb.append(timestamp);
+    sb.append("|");
+    sb.append(file);
+    sb.append("|");
+    sb.append(changeLogFileName);
+    sb.append("|");
+    sb.append(startOffset);
+    sb.append("|");
+    sb.append(endOffset);
+    return sb.toString();
+  }
+
+  public static ChangeRecord fromString(String line)
+  {
+    ChangeRecord record=null;
+    if (line != null)
+    {
+      String[] split = line.split("\\|");
+      if (split.length == 6)
+      {
+        record = new ChangeRecord();
+        record.txid = Long.parseLong(split[0]);
+        record.timestamp = Long.parseLong(split[1]);
+        record.file = split[2];
+        record.changeLogFileName = split[3];
+        record.startOffset = Long.parseLong(split[4]);
+        record.endOffset = Long.parseLong(split[5]);
+      }
+    }
+    return record;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/CheckpointFile.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/CheckpointFile.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/CheckpointFile.java
new file mode 100644
index 0000000..0d44e51
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/CheckpointFile.java
@@ -0,0 +1,76 @@
+package org.apache.helix.filestore;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+
+public class CheckpointFile
+{
+  private static final String TEMP = ".bak";
+  private static String CHECK_POINT_FILE = "lastprocessedchange.checkpoint";
+
+  private String checkpointDirPath;
+
+  public CheckpointFile(String checkpointDirPath) throws IOException
+  {
+    this.checkpointDirPath = checkpointDirPath;
+    File checkpointdir = new File(checkpointDirPath);
+    if (!checkpointdir.exists() && !checkpointdir.mkdirs())
+    {
+      throw new IOException("unable to create SCN file parent:"
+          + checkpointdir.getAbsolutePath());
+    }
+  }
+
+  public void checkpoint(ChangeRecord lastRecordProcessed) throws Exception
+  {
+
+    // delete the temp file if one exists
+    File tempCheckpointFile = new File(checkpointDirPath + "/" + CHECK_POINT_FILE
+        + TEMP);
+    if (tempCheckpointFile.exists() && !tempCheckpointFile.delete())
+    {
+      System.err.println("unable to erase temp SCN file: "
+          + tempCheckpointFile.getAbsolutePath());
+    }
+
+    String checkpointFileName = checkpointDirPath + "/" + CHECK_POINT_FILE;
+    File checkpointfile = new File(checkpointFileName);
+    if (checkpointfile.exists() && !checkpointfile.renameTo(tempCheckpointFile))
+    {
+      System.err.println("unable to backup scn file");
+    }
+    if (!checkpointfile.createNewFile())
+    {
+      System.err.println("unable to create new SCN file:"
+          + checkpointfile.getAbsolutePath());
+    }
+    FileWriter writer = new FileWriter(checkpointfile);
+    writer.write(lastRecordProcessed.toString());
+    writer.flush();
+    writer.close();
+    System.out.println("scn persisted: " + lastRecordProcessed.txid);
+
+  }
+
+  public ChangeRecord findLastRecordProcessed()
+  {
+    String checkpointFileName = checkpointDirPath + "/" + CHECK_POINT_FILE;
+    File file = new File(checkpointFileName);
+    ChangeRecord record = null;
+    if (file.exists())
+    {
+      try
+      {
+        String line = FileUtils.readFileToString(file);
+        record = ChangeRecord.fromString(line);
+      } catch (IOException e)
+      {
+        e.printStackTrace();
+      }
+    }
+    return record;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ExternalCommand.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ExternalCommand.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ExternalCommand.java
new file mode 100644
index 0000000..1ee822f
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/ExternalCommand.java
@@ -0,0 +1,401 @@
+package org.apache.helix.filestore;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.log4j.Logger;
+
+public class ExternalCommand
+{
+  public static final String MODULE = ExternalCommand.class.getName();
+  public static final Logger LOG = Logger.getLogger(MODULE);
+
+  private final ProcessBuilder _processBuilder;
+
+  private Process _process;
+  private InputReader _out;
+  private InputReader _err;
+
+  private static class InputReader extends Thread
+  {
+    private static final int BUFFER_SIZE = 2048;
+
+    private final InputStream _in;
+    private final ByteArrayOutputStream _out;
+    private boolean _running = false;
+
+    InputReader(InputStream in)
+    {
+      _in = in;
+      _out = new ByteArrayOutputStream();
+    }
+
+    @Override
+    public void run()
+    {
+      _running = true;
+
+      byte[] buf = new byte[BUFFER_SIZE];
+      int n = 0;
+      try
+      {
+        while((n = _in.read(buf)) != -1)
+          _out.write(buf, 0, n);
+      }
+      catch(IOException e)
+      {
+        LOG.error("error while reading external command", e);
+      }
+
+      _running = false;
+    }
+
+    public byte[] getOutput()
+    {
+      if(_running)
+        throw new IllegalStateException("wait for process to be completed");
+
+      return _out.toByteArray();
+    }
+  }
+  /**
+* Constructor */
+  public ExternalCommand(ProcessBuilder processBuilder)
+  {
+    _processBuilder = processBuilder;
+  }
+
+  /**
+* After creating the command, you have to start it...
+*
+* @throws IOException
+*/
+  public void start() throws IOException
+  {
+    _process = _processBuilder.start();
+    //System.out.println(_processBuilder);
+    _out = new InputReader(new BufferedInputStream(_process.getInputStream()));
+    _err = new InputReader(new BufferedInputStream(_process.getErrorStream()));
+
+    _out.start();
+    _err.start();
+  }
+
+  /**
+* @see ProcessBuilder
+*/
+  public Map<String, String> getEnvironment()
+  {
+    return _processBuilder.environment();
+  }
+
+  /**
+* @see ProcessBuilder
+*/
+  public File getWorkingDirectory()
+  {
+    return _processBuilder.directory();
+  }
+
+  /**
+* @see ProcessBuilder
+*/
+  public void setWorkingDirectory(File directory)
+  {
+    _processBuilder.directory(directory);
+  }
+
+  /**
+* @see ProcessBuilder
+*/
+  public boolean getRedirectErrorStream()
+  {
+    return _processBuilder.redirectErrorStream();
+  }
+
+  /**
+* @see ProcessBuilder
+*/
+  public void setRedirectErrorStream(boolean redirectErrorStream)
+  {
+    _processBuilder.redirectErrorStream(redirectErrorStream);
+  }
+
+  public byte[] getOutput() throws InterruptedException
+  {
+    waitFor();
+    return _out.getOutput();
+  }
+
+  public byte[] getError() throws InterruptedException
+  {
+    waitFor();
+    return _err.getOutput();
+  }
+
+  /**
+* Returns the output as a string.
+*
+* @param encoding
+* @return encoded string
+* @throws InterruptedException
+* @throws UnsupportedEncodingException
+*/
+  public String getStringOutput(String encoding) throws InterruptedException,
+                                                        UnsupportedEncodingException
+  {
+    return new String(getOutput(), encoding);
+  }
+
+  /**
+* Returns the output as a string. Uses encoding "UTF-8".
+*
+* @return utf8 encoded string
+* @throws InterruptedException
+*/
+  public String getStringOutput() throws InterruptedException
+  {
+    try
+    {
+      return getStringOutput("UTF-8");
+    }
+    catch(UnsupportedEncodingException e)
+    {
+      // should not happen
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+* Returns the error as a string.
+*
+* @param encoding
+* @return error as string
+* @throws InterruptedException
+* @throws UnsupportedEncodingException
+*/
+  public String getStringError(String encoding) throws InterruptedException,
+                                                       UnsupportedEncodingException
+  {
+    return new String(getError(), encoding);
+  }
+
+  /**
+* Returns the error as a string. Uses encoding "UTF-8".
+*
+* @return error as string
+* @throws InterruptedException
+*/
+  public String getStringError() throws InterruptedException
+  {
+    try
+    {
+      return getStringError("UTF-8");
+    }
+    catch(UnsupportedEncodingException e)
+    {
+      // should not happen
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+* Properly waits until everything is complete: joins on the thread that
+* reads the output, joins on the thread that reads the error and finally
+* wait for the process to be finished.
+* @return the status code of the process.
+*
+* @throws InterruptedException
+*/
+  public int waitFor() throws InterruptedException
+  {
+    if(_process == null)
+      throw new IllegalStateException("you must call start first");
+
+    _out.join();
+    _err.join();
+    return _process.waitFor();
+  }
+
+  /**
+* Properly waits until everything is complete: joins on the thread that
+* reads the output, joins on the thread that reads the error and finally
+* wait for the process to be finished.
+* If the process has not completed before the timeout, throws a
+* {@link TimeoutException}
+* @return the status code of the process.
+*
+* @throws TimeoutException
+* @throws InterruptedException
+*/
+  public int waitFor(long timeout) throws InterruptedException, TimeoutException
+  {
+    if(_process == null)
+      throw new IllegalStateException("you must call start first");
+
+//    Chronos c = new Chronos();
+    _out.join(timeout);
+//    timeout -= c.tick();
+    if (timeout <= 0)
+      throw new TimeoutException("Wait timed out");
+    _err.join(timeout);
+//    timeout -= c.tick();
+    if (timeout <= 0)
+      throw new TimeoutException("Wait timed out");
+
+    // there is no timeout in this API, not much we can do here
+    // waiting on the other two threads should give us some safety
+    return _process.waitFor();
+  }
+
+  public int exitValue()
+  {
+    if(_process == null)
+      throw new IllegalStateException("you must call start first");
+
+    return _process.exitValue();
+  }
+
+  public void destroy()
+  {
+    if(_process == null)
+      throw new IllegalStateException("you must call start first");
+
+    _process.destroy();
+  }
+
+  /**
+* Creates an external process from the command. It is not started and you have to call
+* start on it!
+*
+* @param commands the command to execute
+* @return the process */
+  public static ExternalCommand create(String... commands)
+  {
+    ExternalCommand ec = new ExternalCommand(new ProcessBuilder(commands));
+    return ec;
+  }
+
+  /**
+* Creates an external process from the command. It is not started and you have to call
+* start on it!
+*
+* @param commands the command to execute
+* @return the process */
+  public static ExternalCommand create(List<String> commands)
+  {
+    ExternalCommand ec = new ExternalCommand(new ProcessBuilder(commands));
+    return ec;
+  }
+
+  /**
+* Creates an external process from the command. The command is executed.
+*
+* @param commands the commands to execute
+* @return the process
+* @throws IOException if there is an error */
+  public static ExternalCommand start(String... commands) throws IOException
+  {
+    ExternalCommand ec = new ExternalCommand(new ProcessBuilder(commands));
+    ec.start();
+    return ec;
+  }
+
+  /**
+* Executes the external command in the given working directory and waits for it to be
+* finished.
+*
+* @param workingDirectory the root directory from where to run the command
+* @param command the command to execute (should be relative to the working directory
+* @param args the arguments to the command
+* @return the process */
+  public static ExternalCommand execute(File workingDirectory,
+                                        String command,
+                                        String... args)
+      throws IOException, InterruptedException
+  {
+    try
+    {
+      return executeWithTimeout(workingDirectory, command, 0, args);
+    }
+    catch (TimeoutException e)
+    {
+      // Can't happen!
+      throw new IllegalStateException(MODULE + ".execute: Unexpected timeout occurred!");
+    }
+  }
+
+/**
+* Executes the external command in the given working directory and waits (until timeout
+* is elapsed) for it to be finished.
+*
+* @param workingDirectory
+* the root directory from where to run the command
+* @param command
+* the command to execute (should be relative to the working directory
+* @param timeout
+* the maximum amount of time to wait for this external command (in ms). If
+* this value is less than or equal to 0, timeout is ignored
+* @param args
+* the arguments to the command
+* @return the process
+*/
+  public static ExternalCommand executeWithTimeout(File workingDirectory,
+                                                   String command,
+                                                   long timeout,
+                                                   String... args)
+      throws IOException, InterruptedException, TimeoutException
+  {
+    List<String> arguments = new ArrayList<String>(args.length + 1);
+
+    arguments.add(new File(workingDirectory, command).getAbsolutePath());
+    arguments.addAll(Arrays.asList(args));
+
+    ExternalCommand cmd = ExternalCommand.create(arguments);
+
+    cmd.setWorkingDirectory(workingDirectory);
+
+    cmd.setRedirectErrorStream(true);
+
+    cmd.start();
+
+    /* Use timeout if it is a valid value! */
+    if (timeout <= 0)
+      cmd.waitFor();
+    else
+      cmd.waitFor(timeout);
+
+    if (LOG.isDebugEnabled())
+      LOG.debug(cmd.getStringOutput());
+
+    return cmd;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileChangeWatcher.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileChangeWatcher.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileChangeWatcher.java
new file mode 100644
index 0000000..549da36
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileChangeWatcher.java
@@ -0,0 +1,11 @@
+package org.apache.helix.filestore;
+
+public interface FileChangeWatcher
+{
+  void onEntryModified(String path);
+
+  void onEntryAdded(String path);
+
+  void onEntryDeleted(String path);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStore.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStore.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStore.java
new file mode 100644
index 0000000..be94148
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStore.java
@@ -0,0 +1,124 @@
+package org.apache.helix.filestore;
+
+import java.util.List;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.participant.StateMachineEngine;
+
+public class FileStore
+{
+  private final String _zkAddr;
+  private final String _clusterName;
+  private final String _serverId;
+  private HelixManager _manager = null;
+
+  public FileStore(String zkAddr, String clusterName, String serverId)
+  {
+    _zkAddr = zkAddr;
+    _clusterName = clusterName;
+    _serverId = serverId;
+  }
+
+  public void connect()
+  {
+    try
+    {
+      _manager =
+          HelixManagerFactory.getZKHelixManager(_clusterName,
+                                                _serverId,
+                                                InstanceType.PARTICIPANT,
+                                                _zkAddr);
+
+      StateMachineEngine stateMach = _manager.getStateMachineEngine();
+      FileStoreStateModelFactory modelFactory =
+          new FileStoreStateModelFactory(_manager);
+      stateMach.registerStateModelFactory(SetupCluster.DEFAULT_STATE_MODEL, modelFactory);
+      _manager.connect();
+//      _manager.addExternalViewChangeListener(replicator);
+      Thread.currentThread().join();
+    }
+    catch (InterruptedException e)
+    {
+      System.err.println(" [-] " + _serverId + " is interrupted ...");
+    }
+    catch (Exception e)
+    {
+      e.printStackTrace();
+    }
+    finally
+    {
+      disconnect();
+    }
+  }
+
+  public void disconnect()
+  {
+    if (_manager != null)
+    {
+      _manager.disconnect();
+    }
+  }
+
+  public static void main(String[] args) throws Exception
+  {
+    if (args.length < 3)
+    {
+      System.err.println("USAGE: java FileStore zookeeperAddress (e.g. localhost:2181) serverId , rabbitmqServer (e.g. localhost)");
+      System.exit(1);
+    }
+
+    final String zkAddr = args[0];
+    final String clusterName = SetupCluster.DEFAULT_CLUSTER_NAME;
+    final String consumerId = args[1];
+
+    ZkClient zkclient = null;
+    try
+    {
+      // add node to cluster if not already added
+      zkclient =
+          new ZkClient(zkAddr,
+                       ZkClient.DEFAULT_SESSION_TIMEOUT,
+                       ZkClient.DEFAULT_CONNECTION_TIMEOUT,
+                       new ZNRecordSerializer());
+      ZKHelixAdmin admin = new ZKHelixAdmin(zkclient);
+
+      List<String> nodes = admin.getInstancesInCluster(clusterName);
+      if (!nodes.contains("consumer_" + consumerId))
+      {
+        InstanceConfig config = new InstanceConfig("consumer_" + consumerId);
+        config.setHostName("localhost");
+        config.setInstanceEnabled(true);
+        admin.addInstance(clusterName, config);
+      }
+
+      // start consumer
+      final FileStore consumer =
+          new FileStore(zkAddr, clusterName, "consumer_" + consumerId);
+
+      Runtime.getRuntime().addShutdownHook(new Thread()
+      {
+        @Override
+        public void run()
+        {
+          System.out.println("Shutting down consumer_" + consumerId);
+          consumer.disconnect();
+        }
+      });
+
+      consumer.connect();
+    }
+    finally
+    {
+      if (zkclient != null)
+      {
+        zkclient.close();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModel.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModel.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModel.java
new file mode 100644
index 0000000..760bcf1
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModel.java
@@ -0,0 +1,221 @@
+package org.apache.helix.filestore;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.data.Stat;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+
+@StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "MASTER",
+    "SLAVE" })
+public class FileStoreStateModel extends StateModel
+{
+  private final class HighWaterMarkUpdater implements DataUpdater<ZNRecord>
+  {
+    private final Message message;
+    private final ChangeRecord lastRecordProcessed;
+
+    private HighWaterMarkUpdater(Message message,
+        ChangeRecord lastRecordProcessed)
+    {
+      this.message = message;
+      this.lastRecordProcessed = lastRecordProcessed;
+    }
+
+    @Override
+    public ZNRecord update(ZNRecord currentData)
+    {
+      ZNRecord newRec = new ZNRecord(message.getResourceName());
+
+      if (currentData != null)
+      {
+        int currentGen = convertToInt(newRec.getSimpleField("currentGen"), 0);
+        int currentGenStartSeq = convertToInt(
+            newRec.getSimpleField("currentGenStartSeq"), 0);
+        int prevGen = convertToInt(newRec.getSimpleField("prevGen"), 0);
+        int prevGenEndSeq = convertToInt(
+            newRec.getSimpleField("prevGenEndSeq"), 0);
+        newRec.setSimpleField("currentGen", Integer.toString(currentGen + 1));
+        newRec.setSimpleField("currentGenStartSeq", Integer.toString(1));
+        if (currentGen > 0)
+        {
+          newRec.setSimpleField("prevGen", Integer.toString(currentGen));
+          int localEndSeq = 1;
+          if (lastRecordProcessed != null)
+          {
+            localEndSeq = (int) lastRecordProcessed.txid;
+          }
+          newRec.setSimpleField("prevGenEndSeq", "" + localEndSeq);
+        }
+        newRec.merge(currentData);
+      } else
+      {
+        newRec.setSimpleField("currentGen", Integer.toString(1));
+        newRec.setSimpleField("currentGenStartSeq", Integer.toString(1));
+      }
+      return newRec;
+
+    }
+
+    private int convertToInt(String number, int defaultValue)
+    {
+      try
+      {
+        if (number != null)
+        {
+          return Integer.parseInt(number);
+        }
+      } catch (Exception e)
+      {
+
+      }
+      return defaultValue;
+    }
+  }
+
+  private static Logger LOG = Logger.getLogger(FileStoreStateModel.class);
+
+  private final String _serverId;
+  private final String _partition;
+
+  private Replicator replicator;
+
+  private ChangeLogGenerator generator;
+
+  private FileSystemWatchService service;
+
+  private InstanceConfig instanceConfig;
+
+  public FileStoreStateModel(HelixManager manager, String resource,
+      String partition) 
+  {
+    String clusterName = manager.getClusterName();
+    String instanceName = manager.getInstanceName();
+    instanceConfig = manager.getClusterManagmentTool().getInstanceConfig(
+        clusterName, instanceName);
+    replicator = new Replicator(instanceConfig, resource, partition);
+    try
+    {
+      manager.addExternalViewChangeListener(replicator);
+    } catch (Exception e)
+    {
+      e.printStackTrace();
+    }
+    _partition = partition;
+    _serverId = instanceName;
+  }
+
+  /**
+   * If the node is slave, start the rsync thread if it is not started
+   * 
+   * @param message
+   * @param context
+   * @throws Exception
+   */
+
+  @Transition(from = "OFFLINE", to = "SLAVE")
+  public void onBecomeSlaveFromOffline(Message message,
+      NotificationContext context) throws Exception
+  {
+    System.out.println(_serverId + " transitioning from " + message.getFromState()
+        + " to " + message.getToState() + " for " + _partition);
+
+    replicator.start();
+    System.out.println(_serverId + " transitioned from " + message.getFromState()
+        + " to " + message.getToState() + " for " + _partition);
+  }
+
+  /**
+   * When the node becomes master, it will start accepting writes and increments
+   * the epoch and starts logging the changes in a file
+   * 
+   * @param message
+   * @param context
+   * @throws Exception
+   */
+  @Transition(from = "SLAVE", to = "MASTER")
+  public void onBecomeMasterFromSlave(final Message message,
+      NotificationContext context) throws Exception
+  {
+    replicator.stop();
+    System.out.println(_serverId + " transitioning from " + message.getFromState()
+        + " to " + message.getToState() + " for " + _partition);
+    ZkHelixPropertyStore<ZNRecord> helixPropertyStore = context.getManager()
+        .getHelixPropertyStore();
+    String checkpointDirPath = instanceConfig.getRecord().getSimpleField(
+        "check_point_dir");
+    CheckpointFile checkpointFile = new CheckpointFile(checkpointDirPath);
+    final ChangeRecord lastRecordProcessed = checkpointFile
+        .findLastRecordProcessed();
+    DataUpdater<ZNRecord> updater = new HighWaterMarkUpdater(message,
+        lastRecordProcessed);
+    helixPropertyStore.update(
+        "TRANSACTION_ID_METADATA" + "/" + message.getResourceName(), updater,
+        AccessOption.PERSISTENT);
+    Stat stat = new Stat();
+    ;
+    ZNRecord znRecord = helixPropertyStore.get("TRANSACTION_ID_METADATA" + "/"
+        + message.getResourceName(), stat, AccessOption.PERSISTENT);
+    int startGen = Integer.parseInt(znRecord.getSimpleField("currentGen"));
+    int startSeq = Integer.parseInt(znRecord
+        .getSimpleField("currentGenStartSeq"));
+    String fileStoreDir = instanceConfig.getRecord().getSimpleField("file_store_dir");
+    String changeLogDir = instanceConfig.getRecord().getSimpleField("change_log_dir");
+
+    generator = new ChangeLogGenerator(changeLogDir, startGen, startSeq);
+    //To indicate that we need callbacks for changes that happen starting now
+    long now = System.currentTimeMillis();
+    service = new FileSystemWatchService(fileStoreDir, now, generator);
+    service.start();
+    System.out.println(_serverId + " transitioned from " + message.getFromState()
+        + " to " + message.getToState() + " for " + _partition);
+  }
+
+  /**
+   * Stop writing
+   * 
+   * @param message
+   * @param context
+   * @throws Exception
+   */
+
+  @Transition(from = "MASTER", to = "SLAVE")
+  public void onBecomeSlaveFromMaster(Message message,
+      NotificationContext context) throws Exception
+  {
+    service.stop();
+    LOG.info(_serverId + " transitioning from " + message.getFromState()
+        + " to " + message.getToState() + " for " + _partition);
+    replicator.start();
+  }
+
+  @Transition(from = "SLAVE", to = "OFFLINE")
+  public void onBecomeOfflineFromSlave(Message message,
+      NotificationContext context)
+  {
+    replicator.stop();
+    LOG.info(_serverId + " transitioning from " + message.getFromState()
+        + " to " + message.getToState() + " for " + _partition);
+  }
+
+  public void onBecomeDroppedFromOffline(Message message,
+      NotificationContext context)
+  {
+    LOG.info(_serverId + " Dropping partition " + _partition);
+  }
+
+  @Override
+  public void reset()
+  {
+    LOG.warn("Default reset() invoked");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModelFactory.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModelFactory.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModelFactory.java
new file mode 100644
index 0000000..5e36874
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModelFactory.java
@@ -0,0 +1,21 @@
+package org.apache.helix.filestore;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+public class FileStoreStateModelFactory extends StateModelFactory<FileStoreStateModel>
+{
+  private final HelixManager manager;
+  public FileStoreStateModelFactory(HelixManager manager)
+  {
+    this.manager = manager;
+  }
+  
+  @Override
+  public FileStoreStateModel createNewStateModel(String partition)
+  {
+    FileStoreStateModel model;
+      model = new FileStoreStateModel(manager,partition.split("_")[0], partition);
+    return model;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileSystemWatchService.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileSystemWatchService.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileSystemWatchService.java
new file mode 100644
index 0000000..c4826ed
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileSystemWatchService.java
@@ -0,0 +1,171 @@
+package org.apache.helix.filestore;
+
+import java.io.File;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.jci.listeners.AbstractFilesystemAlterationListener;
+import org.apache.commons.jci.monitor.FilesystemAlterationMonitor;
+
+public class FileSystemWatchService
+{
+  enum ChangeType
+  {
+    CREATE, DELETE, MODIFY
+  };
+
+  private FilesystemAlterationMonitor fam;
+  private MyFilesystemAlterationListener listener;
+  private Thread thread;
+
+  public FileSystemWatchService(String root, FileChangeWatcher... watchers)
+  {
+    this(root, -1, watchers);
+  }
+
+  public FileSystemWatchService(String root, long startTime,
+      FileChangeWatcher... watchers)
+  {
+    File file = new File(root);
+    System.out.println("Setting up watch service for path:"
+        + file.getAbsolutePath());
+    fam = new FilesystemAlterationMonitor();
+    listener = new MyFilesystemAlterationListener(root, startTime, watchers);
+    fam.addListener(file, listener);
+  }
+
+  public void start()
+  {
+    fam.start();
+    thread = new Thread(listener);
+    thread.start();
+  }
+
+  static class MyFilesystemAlterationListener extends
+      AbstractFilesystemAlterationListener implements Runnable
+  {
+    private final FileChangeWatcher[] watchers;
+    private int length;
+    private final long startTime;
+    private AtomicBoolean stopRequest;
+
+    public MyFilesystemAlterationListener(String root, long startTime,
+        FileChangeWatcher[] watchers)
+    {
+      this.startTime = startTime;
+      File file = new File(root);
+      length = root.length() + 1;
+      this.watchers = watchers;
+      stopRequest = new AtomicBoolean(false);
+      
+    }
+
+    @SuppressWarnings("unchecked")
+    public void run()
+    {
+      while (!stopRequest.get())
+      {
+        try
+        {
+          waitForCheck();
+          process(getCreatedDirectories(), watchers, ChangeType.CREATE);
+          process(getDeletedDirectories(), watchers, ChangeType.DELETE);
+          process(getChangedDirectories(), watchers, ChangeType.MODIFY);
+          process(getCreatedFiles(), watchers, ChangeType.CREATE);
+          process(getDeletedFiles(), watchers, ChangeType.DELETE);
+          process(getChangedFiles(), watchers, ChangeType.MODIFY);
+
+        } catch (Exception e)
+        {
+          e.printStackTrace();
+        }
+      }
+    }
+
+    private void process(Collection<File> files, FileChangeWatcher[] watchers,
+        ChangeType type)
+    {
+      if (files.size() > 0)
+      {
+        for (File file : files)
+        {
+          if (file.lastModified() < startTime)
+          {
+            continue;
+          }
+          String path = file.getPath();
+          String relativePath = ".";
+          if (path.length() > length)
+          {
+            relativePath = path.substring(length);
+          }
+          for (FileChangeWatcher watcher : watchers)
+          {
+            switch (type)
+            {
+            case CREATE:
+              watcher.onEntryAdded(relativePath);
+              break;
+            case DELETE:
+              watcher.onEntryDeleted(relativePath);
+              break;
+            case MODIFY:
+              watcher.onEntryModified(relativePath);
+              break;
+            }
+          }
+        }
+      }
+    }
+
+    public void stop()
+    {
+      stopRequest.set(true);
+    }
+  }
+
+  public void stop()
+  {
+    listener.stop();
+    fam.stop();
+    thread.interrupt();
+  }
+
+  public static void main(String[] args) throws Exception
+  {
+    FileChangeWatcher defaultWatcher = new FileChangeWatcher()
+    {
+
+      @Override
+      public void onEntryModified(String path)
+      {
+        System.out
+            .println("FileSystemWatchService.main(...).new FileChangeWatcher() {...}.onEntryModified():"
+                + path);
+      }
+
+      @Override
+      public void onEntryDeleted(String path)
+      {
+        System.out
+            .println("FileSystemWatchService.main(...).new FileChangeWatcher() {...}.onEntryDeleted():"
+                + path);
+      }
+
+      @Override
+      public void onEntryAdded(String path)
+      {
+        System.out
+            .println("FileSystemWatchService.main(...).new FileChangeWatcher() {...}.onEntryAdded() : "
+                + path);
+      }
+    };
+    ChangeLogGenerator ChangeLogGenerator = new ChangeLogGenerator(
+        "data/localhost_12000/translog", 1, 1);
+    FileSystemWatchService watchService = new FileSystemWatchService(
+        "data/localhost_12000/filestore", defaultWatcher);
+    watchService.start();
+    Thread.sleep(10000);
+    watchService.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/IntegrationTest.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/IntegrationTest.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/IntegrationTest.java
new file mode 100644
index 0000000..9167549
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/IntegrationTest.java
@@ -0,0 +1,201 @@
+package org.apache.helix.filestore;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.ConfigScope;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.StateModelConfigGenerator;
+
+public class IntegrationTest
+{
+
+  public static void main(String[] args) throws InterruptedException
+  {
+    ZkServer server = null;
+    ;
+
+    try
+    {
+      String baseDir = "/tmp/IntegrationTest/";
+      final String dataDir = baseDir + "zk/dataDir";
+      final String logDir = baseDir + "/tmp/logDir";
+      FileUtils.deleteDirectory(new File(dataDir));
+      FileUtils.deleteDirectory(new File(logDir));
+
+      IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace()
+      {
+        @Override
+        public void createDefaultNameSpace(ZkClient zkClient)
+        {
+
+        }
+      };
+      int zkPort = 2199;
+      final String zkAddress = "localhost:" + zkPort;
+
+      server = new ZkServer(dataDir, logDir, defaultNameSpace, zkPort);
+      server.start();
+      ClusterSetup setup = new ClusterSetup(zkAddress);
+      final String clusterName = "file-store-test";
+      setup.deleteCluster(clusterName);
+      setup.addCluster(clusterName, true);
+      setup.addInstanceToCluster(clusterName, "localhost", 12001);
+      setup.addInstanceToCluster(clusterName, "localhost", 12002);
+      setup.addInstanceToCluster(clusterName, "localhost", 12003);
+      setup.addResourceToCluster(clusterName, "repository", 1, "MasterSlave");
+      setup.reblanceResource(clusterName, "repository", 3);
+      // Set the configuration
+      final String instanceName1 = "localhost_12001";
+      addConfiguration(setup, baseDir, clusterName, instanceName1);
+      final String instanceName2 = "localhost_12002";
+      addConfiguration(setup, baseDir, clusterName, instanceName2);
+      final String instanceName3 = "localhost_12003";
+      addConfiguration(setup, baseDir, clusterName, instanceName3);
+      Thread thread1 = new Thread(new Runnable()
+      {
+        @Override
+        public void run()
+        {
+          FileStore fileStore = null;
+
+          try
+          {
+            fileStore = new FileStore(zkAddress, clusterName, instanceName1);
+            fileStore.connect();
+          } catch (Exception e)
+          {
+            System.err.println("Exception" + e);
+            fileStore.disconnect();
+          }
+        }
+
+      });
+      // START NODES
+      Thread thread2 = new Thread(new Runnable()
+      {
+
+        @Override
+        public void run()
+        {
+          FileStore fileStore = new FileStore(zkAddress, clusterName,
+              instanceName2);
+          fileStore.connect();
+        }
+      });
+      // START NODES
+      Thread thread3 = new Thread(new Runnable()
+      {
+
+        @Override
+        public void run()
+        {
+          FileStore fileStore = new FileStore(zkAddress, clusterName,
+              instanceName3);
+          fileStore.connect();
+        }
+      });
+      System.out.println("STARTING NODES");
+      thread1.start();
+      thread2.start();
+      thread3.start();
+
+      // Start Controller
+      final HelixManager manager = HelixControllerMain.startHelixController(
+          zkAddress, clusterName, "controller", HelixControllerMain.STANDALONE);
+      Thread.sleep(5000);
+      printStatus(manager);
+      listFiles(baseDir);
+      System.out.println("Writing files a.txt and b.txt to current master "
+          + baseDir + "localhost_12001" + "/filestore");
+      FileUtils.writeStringToFile(new File(baseDir + "localhost_12001"
+          + "/filestore/a.txt"), "some_data in a");
+      FileUtils.writeStringToFile(new File(baseDir + "localhost_12001"
+          + "/filestore/b.txt"), "some_data in b");
+      Thread.sleep(10000);
+      listFiles(baseDir);
+      Thread.sleep(5000);
+      System.out.println("Stopping the MASTER node:" + "localhost_12001");
+      thread1.interrupt();
+      Thread.sleep(10000);
+      printStatus(manager);
+      System.out.println("Writing files c.txt and d.txt to current master "
+          + baseDir + "localhost_12002" + "/filestore");
+      FileUtils.writeStringToFile(new File(baseDir + "localhost_12002"
+          + "/filestore/c.txt"), "some_data in c");
+      FileUtils.writeStringToFile(new File(baseDir + "localhost_12002"
+          + "/filestore/d.txt"), "some_data in d");
+      listFiles(baseDir);
+      System.out.println("Create or modify any files under " + baseDir
+          + "localhost_12002" + "/filestore"
+          + " and it should get replicated to " + baseDir + "localhost_12003"
+          + "/filestore");
+    } catch (Exception e)
+    {
+      e.printStackTrace();
+    } finally
+    {
+      if (server != null)
+      {
+        // server.shutdown();
+      }
+    }
+    Thread.currentThread().join();
+  }
+
+  private static void listFiles(String baseDir)
+  {
+    System.out.println("===============FILES===============================");
+    String[] instances = new String[] { "localhost_12001", "localhost_12002",
+        "localhost_12003" };
+    for (String instance : instances)
+    {
+      String dir = baseDir + instance + "/filestore";
+      String[] list = new File(dir).list();
+      System.out.println(dir + ":"
+          + ((list != null) ? Arrays.toString(list) : "NONE"));
+    }
+    System.out.println("===============FILES===============================");
+  }
+
+  private static void printStatus(final HelixManager manager)
+  {
+    System.out.println("CLUSTER STATUS");
+    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+    Builder keyBuilder = helixDataAccessor.keyBuilder();
+    System.out.println("External View \n"
+        + helixDataAccessor.getProperty(keyBuilder.externalView("repository")));
+  }
+
+  private static void addConfiguration(ClusterSetup setup, String baseDir,
+      String clusterName, String instanceName) throws IOException
+  {
+    Map<String, String> properties = new HashMap<String, String>();
+    ConfigScopeBuilder builder = new ConfigScopeBuilder();
+    ConfigScope instanceScope = builder.forCluster(clusterName)
+        .forParticipant(instanceName).build();
+    properties.put("change_log_dir", baseDir + instanceName + "/translog");
+    properties.put("file_store_dir", baseDir + instanceName + "/filestore");
+    properties.put("check_point_dir", baseDir + instanceName + "/checkpoint");
+    setup.getClusterManagementTool().setConfig(instanceScope, properties);
+    FileUtils.deleteDirectory(new File(properties.get("change_log_dir")));
+    FileUtils.deleteDirectory(new File(properties.get("file_store_dir")));
+    FileUtils.deleteDirectory(new File(properties.get("check_point_dir")));
+    new File(properties.get("change_log_dir")).mkdirs();
+    new File(properties.get("file_store_dir")).mkdirs();
+    new File(properties.get("check_point_dir")).mkdirs();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/Replicator.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/Replicator.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/Replicator.java
new file mode 100644
index 0000000..dddf9c9
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/Replicator.java
@@ -0,0 +1,166 @@
+package org.apache.helix.filestore;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.spectator.RoutingTableProvider;
+
+public class Replicator extends RoutingTableProvider
+{
+  private InstanceConfig currentMasterConfig;
+  private final InstanceConfig localInstanceConfig;
+
+  private final String partition;
+  private final String resourceName;
+  AtomicBoolean isReplicationInitiated;
+  AtomicBoolean isReplicationStarted;
+  RsyncInvoker rsyncInvoker;
+  private ChangeLogProcessor processor;
+  private FileSystemWatchService watchService;
+  private ChangeLogReader reader;
+
+  public void setRsyncInvoker(RsyncInvoker rsyncInvoker)
+  {
+    this.rsyncInvoker = rsyncInvoker;
+  }
+
+  public Replicator(InstanceConfig localInstanceConfig, String resourceName,
+      String partition)
+  {
+    this.localInstanceConfig = localInstanceConfig;
+    this.resourceName = resourceName;
+    this.partition = partition;
+    isReplicationInitiated = new AtomicBoolean(false);
+    isReplicationStarted = new AtomicBoolean(false);
+
+  }
+
+  public void start() throws Exception
+  {
+//System.out.println("Starting replication for ");
+    isReplicationInitiated.set(true);
+
+    List<InstanceConfig> instances = getInstances(resourceName, partition,
+        "MASTER");
+    if (instances.size() > 0)
+    {
+      if (instances.size() == 1)
+      {
+        InstanceConfig newMasterConfig = instances.get(0);
+        String master = newMasterConfig.getInstanceName();
+        if (currentMasterConfig == null
+            || !master.equalsIgnoreCase(currentMasterConfig.getInstanceName()))
+        {
+          System.out.println("Found new master:"
+              + newMasterConfig.getInstanceName());
+          if(currentMasterConfig!=null){
+            stop();
+          }
+          currentMasterConfig = newMasterConfig;
+          startReplication(currentMasterConfig);
+        } else
+        {
+          System.out.println("Already replicating from " + master);
+        }
+      } else
+      {
+        System.out.println("Invalid number of masters found:" + instances);
+      }
+    } else
+    {
+      System.out.println("No master found");
+    }
+  }
+
+  public void startReplication(InstanceConfig masterInstanceConfig)
+      throws Exception
+  {
+    String remoteHost = masterInstanceConfig.getHostName();
+    String remoteChangeLogDir = masterInstanceConfig.getRecord()
+        .getSimpleField("change_log_dir");
+    String remoteFilestoreDir = masterInstanceConfig.getRecord()
+        .getSimpleField("file_store_dir");
+
+    String localChangeLogDir = localInstanceConfig.getRecord().getSimpleField(
+        "change_log_dir");
+    String localFilestoreDir = localInstanceConfig.getRecord().getSimpleField(
+        "file_store_dir");
+    String localcheckpointDir = localInstanceConfig.getRecord().getSimpleField(
+        "check_point_dir");
+    // setup rsync for the change log directory
+    setupRsync(remoteHost, remoteChangeLogDir, localChangeLogDir);
+    reader = new ChangeLogReader(localChangeLogDir);
+    watchService = new FileSystemWatchService(localChangeLogDir, reader);
+    processor = new ChangeLogProcessor(reader, remoteHost, remoteFilestoreDir,
+        localFilestoreDir, localcheckpointDir);
+    watchService.start();
+    processor.start();
+    isReplicationStarted.set(true);
+  }
+
+  private void setupRsync(String remoteHost, String remoteBaseDir,
+      String localBaseDir) throws Exception
+  {
+    rsyncInvoker = new RsyncInvoker(remoteHost, remoteBaseDir, localBaseDir);
+    boolean started = rsyncInvoker.runInBackground();
+    if (started)
+    {
+      System.out.println("Rsync thread started in background");
+    } else
+    {
+      throw new Exception("Unable to start rsync thread");
+    }
+  }
+
+  public void stop()
+  {
+    if (isReplicationStarted.get())
+    {
+      System.out.println("Stopping replication from current master:"+ currentMasterConfig.getInstanceName());
+      rsyncInvoker.stop();
+      watchService.stop();
+      processor.stop();
+    }
+    isReplicationInitiated.set(false);
+  }
+
+  @Override
+  public void onExternalViewChange(List<ExternalView> viewList,
+      NotificationContext context)
+  {
+    super.onExternalViewChange(viewList, context);
+
+    if (isReplicationInitiated.get())
+    {
+      try
+      {
+        start();
+      } catch (Exception e)
+      {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  public static void main(String[] args) throws Exception
+  {
+    InstanceConfig localInstanceConfig = new InstanceConfig("localhost_12001");
+    ZNRecord record = localInstanceConfig.getRecord();
+    record.setSimpleField("change_log_dir", "data/localhost_12001/translog");
+    record.setSimpleField("file_store_dir", "data/localhost_12001/filestore");
+    record.setSimpleField("check_point_dir", "data/localhost_12001/checkpoint");
+    InstanceConfig masterInstanceConfig = new InstanceConfig("localhost_12001");
+    record = masterInstanceConfig.getRecord();
+    record.setSimpleField("change_log_dir", "data/localhost_12000/translog");
+    record.setSimpleField("file_store_dir", "data/localhost_12000/filestore");
+    record.setSimpleField("check_point_dir", "data/localhost_12000/checkpoint");
+    Replicator replicator = new Replicator(localInstanceConfig, "resource",
+        "partition");
+    replicator.startReplication(masterInstanceConfig);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/RsyncDaemon.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/RsyncDaemon.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/RsyncDaemon.java
new file mode 100644
index 0000000..45f1763
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/RsyncDaemon.java
@@ -0,0 +1,29 @@
+package org.apache.helix.filestore;
+
+
+public class RsyncDaemon
+{
+  public boolean start()
+  {
+    ProcessBuilder pb = new ProcessBuilder("sudo", "rync", "--daemon");
+    ExternalCommand externalCommand = new ExternalCommand(pb);
+    try
+    {
+      int exitVal = externalCommand.waitFor();
+      if(exitVal!=0){
+        return true;
+      }
+    } catch (InterruptedException e)
+    {
+      e.printStackTrace();
+    }
+    return false;
+  }
+
+ 
+
+  public boolean stop()
+  {
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/RsyncInvoker.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/RsyncInvoker.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/RsyncInvoker.java
new file mode 100644
index 0000000..2c94c5e
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/RsyncInvoker.java
@@ -0,0 +1,114 @@
+package org.apache.helix.filestore;
+
+import java.io.IOException;
+
+public class RsyncInvoker
+{
+  private Thread backgroundThread;
+  private final String remoteHost;
+  private final String remoteLogDir;
+  private final String localLogDir;
+
+  public RsyncInvoker(String remoteHost, String remoteBaseDir,
+      String localBaseDir)
+  {
+    this.remoteHost = remoteHost;
+    this.remoteLogDir = remoteBaseDir;
+    this.localLogDir = localBaseDir;
+  }
+
+  public boolean rsync(String relativePath)
+  {
+    int exitVal = -1;
+    try
+    {
+      ProcessBuilder pb = new ProcessBuilder( "rsync", 
+           remoteLogDir+"/"+ relativePath , localLogDir);
+      System.out.println("Rsyncing source:"+ remoteLogDir+"/"+ relativePath  + " dest:"+ localLogDir);
+      ExternalCommand ec = new ExternalCommand(pb);
+      ec.start();
+      exitVal = ec.waitFor();
+      if (exitVal != 0)
+      {
+        System.out.println("Failed to rsync "+ ec.getStringError());
+      } else
+      {
+        return true;
+      }
+    } catch (IOException e)
+    {
+      e.printStackTrace();
+    } catch (InterruptedException e)
+    {
+      e.printStackTrace();
+    }
+
+    return false;
+  }
+  public boolean stop(){
+    if( backgroundThread!=null){
+      backgroundThread.interrupt();
+    }
+    return true;
+  }
+  public boolean runInBackground()
+  {
+    backgroundThread = new Thread(new Runnable()
+    {
+      public void run()
+      {
+        try
+        {
+          int sleep = 1000;
+          while (true)
+          {
+            int exitVal = -1;
+            try
+            {
+              Thread.sleep(sleep);
+              ProcessBuilder pb = new ProcessBuilder( "rsync", "-rvt",
+                   remoteLogDir+"/", localLogDir);
+              //System.out.println("Background rsync source:"+remoteLogDir+"/" +" dest:"+ localLogDir);
+              ExternalCommand ec;
+              ec = new ExternalCommand(pb);
+              ec.start();
+              exitVal = ec.waitFor();
+              String stringError = ec.getStringError();
+              if(stringError!=null && stringError.length()>0)
+              System.err.println(stringError);
+              //System.out.println(ec.getStringOutput());
+              
+            } catch (IOException e)
+            {
+              e.printStackTrace();
+            }
+            if (exitVal != 0)
+            {
+              
+              sleep = Math.min(2 * sleep, 2 * 60 * 1000);
+              System.out.println("Failed to rsync retrying in " + sleep / 1000
+                  + " seconds");
+            } else
+            {
+              sleep = 1000;
+            }
+          }
+        } catch (InterruptedException e)
+        {
+          e.printStackTrace();
+          Thread.currentThread().interrupt();
+        }
+      }
+    });
+    backgroundThread.start();
+    return true;
+  }
+  public static void main(String[] args)
+  {
+    String remoteHost= "localhost";
+    String remoteLogDir="data/localhost_12000/translog";
+    String localLogDir="data/localhost_12001/translog";
+    RsyncInvoker rsyncInvoker = new RsyncInvoker(remoteHost, remoteLogDir, localLogDir);
+    rsyncInvoker.runInBackground();
+  }
+}