You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by ro...@apache.org on 2017/10/18 23:23:40 UTC

[sling-org-apache-sling-cassandra] 01/07: SLING-2798 - Cassandra Resource Provider for Sling, contributed by Dishara Wijewardana, thanks!

This is an automated email from the ASF dual-hosted git repository.

rombert pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-cassandra.git

commit 865dc7f437efeb99490d1387f254710d3f731cb0
Author: Bertrand Delacretaz <bd...@apache.org>
AuthorDate: Mon Dec 23 15:06:29 2013 +0000

    SLING-2798 - Cassandra Resource Provider for Sling, contributed by Dishara Wijewardana, thanks!
    
    git-svn-id: https://svn.apache.org/repos/asf/sling/trunk@1553124 13f79535-47bb-0310-9956-ffa450edef68
---
 README.txt                                         |  73 +++++
 pom.xml                                            | 272 +++++++++++++++++
 .../resource/provider/CassandraConstants.java      |  36 +++
 .../resource/provider/CassandraIterable.java       |  40 +++
 .../resource/provider/CassandraResource.java       | 245 ++++++++++++++++
 .../provider/CassandraResourceProvider.java        | 325 +++++++++++++++++++++
 .../provider/CassandraResourceResolver.java        | 144 +++++++++
 .../resource/provider/mapper/CassandraMapper.java  |  33 +++
 .../provider/mapper/CassandraMapperException.java  |  41 +++
 .../mapper/DefaultCassandraMapperImpl.java         |  47 +++
 .../provider/security/AccessControlUtil.java       | 221 ++++++++++++++
 .../util/CassandraResourceProviderUtil.java        | 146 +++++++++
 .../data/populator/CassandraDataAddLoadTest.java   | 120 ++++++++
 .../test/data/populator/CassandraDataAddTest.java  |  95 ++++++
 .../CassandraDataChildNodeIterableTest.java        | 113 +++++++
 .../data/populator/CassandraDataChildNodeTest.java | 114 ++++++++
 .../populator/CassandraDataParentNodeTest.java     | 106 +++++++
 .../test/data/populator/CassandraDataReadTest.java | 105 +++++++
 .../CassandraModifyResourceProviderAddTest.java    |  88 ++++++
 .../CassandraModifyResourceProviderDeleteTest.java |  88 ++++++
 .../CassandraModifyResourceProviderRevertTest.java |  83 ++++++
 ...andraResourceProviderPerformanceTestClient.java | 156 ++++++++++
 .../perf/CassandraResourceProviderCreateTest.java  | 172 +++++++++++
 .../perf/CassandraResourceProviderDeleteTest.java  | 135 +++++++++
 .../perf/CassandraResourceProviderUpdateTest.java  | 170 +++++++++++
 25 files changed, 3168 insertions(+)

diff --git a/README.txt b/README.txt
new file mode 100644
index 0000000..582187f
--- /dev/null
+++ b/README.txt
@@ -0,0 +1,73 @@
+Cassandra Resource Provider for Apache Sling
+============================================
+Initially contributed by Dishara Wijewardana as part of GSoC 2013.
+
+This doc will guide on how to setup Cassandra resource provider with 
+Sling trunk.
+
+- Pre requisits
+   You should have maven 3.
+   Start Cassandra server in back ground.
+   An up and running Sling launchpad server (build from trunk).
+
+How To:  
+-  Build the source code with Maven 3.x
+-  Find the jar file inside the target folder.
+-  Go to OSGi GUI console provided by Sling and install a new OSGi bundle using the above jar.
+-  Now you will see the bundle is successfully installed.
+
+Now Cassandra Resource Provider is up and running inside sling container.
+
+- Test
+
+Under src/test/java/org/apache/sling/cassandra/test/data/populator, you will find several sample 
+tests which can run against the running instance. There are tests that includes 
+Adding/Deleting/Updating/Creating new cassandra resources in sling. And performance tests 
+under perf package which creates you latency reports on the test runs. Also the newly added 
+utillity class which is AccessControlUtil which evaluates the priviledges for a user at a 
+given path (this is just a executable utility class which is not yet in cooperated to proper 
+interfaces).
+
+Following is a sample code block of Cassandra Resource Provider 
+
+CREATE CODE
+
+            String path1 ="/content/cassandra/movies/xmen";
+
+            CassandraResourceProvider cassandraResourceProvider = new CassandraResourceProvider();
+            createColumnFamily("movies", cassandraResourceProvider.getKeyspace(), new StringSerializer());
+            cassandraResourceProvider.setColumnFamily("movies");
+
+            Map<String,Object> map1 = new HashMap<String, Object>();
+            map1.put("metadata", "resolutionPathInfo=json");
+            map1.put("resourceType", "nt:cassandra0");
+            map1.put("resourceSuperType", "nt:supercass1");
+
+            CassandraResourceResolver resolver =  new CassandraResourceResolver();
+            cassandraResourceProvider.create(resolver,path1,map1);
+            cassandraResourceProvider.commit(resolver);
+
+
+TO READ
+HTTP call to http://localhost:8080/content/cassandra/movies/xmen will return you the json view of it as 
+we already specified the path to resolve as a json.
+
+To DELETE
+            cassandraResourceProvider.delete(resolver,"/content/cassandra/movies/xmen");
+            cassandraResourceProvider.commit(resolver);
+
+
+
+ 
+
+
+
+
+    
+
+
+
+
+
+
+
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..a7b7b4d
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,272 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <!--<parent>-->
+        <!--<groupId>org.apache.sling</groupId>-->
+        <!--<artifactId>sling</artifactId>-->
+        <!--<version>16</version>-->
+        <!--<relativePath>../../../parent/pom.xml</relativePath>-->
+    <!--</parent>-->
+    <groupId>org.apache.sling</groupId>
+    <artifactId>org.apache.sling.cassandra</artifactId>
+    <packaging>bundle</packaging>
+    <version>0.0.1-SNAPSHOT</version>
+
+    <name>Apache Sling Cassandra Resource Provider</name>
+    <description>
+        Provides a ResourceProvider implementation supporting Apache Cassandra
+        based resources.
+    </description>
+
+    <repositories>
+      <repository>
+        <id>apache.incubating</id>
+        <name>Apache Incubating Repository</name>
+        <url>http://people.apache.org/repo/m2-incubating-repository</url>
+      </repository>
+<repository>
+    <id>java.ts</id>
+    <name>Repository hosting the jee6 artifacts</name>
+    <url>http://repo1.maven.org/maven2</url>
+  </repository>
+    </repositories>
+    <!--<pluginRepositories>-->
+      <!--<pluginRepository>-->
+        <!--<id>apache.incubating.plugins</id>-->
+        <!--<name>Apache Incubating Plugin Repository</name>-->
+        <!--<url>http://people.apache.org/repo/m2-incubating-repository</url>-->
+      <!--</pluginRepository>-->
+    <!--</pluginRepositories>-->
+
+     <!--TODO uncommet following at the time of releasing this bundle-->
+    <!--<scm>-->
+        <!--<connection>scm:svn:http://svn.apache.org/repos/asf/sling/trunk/bundles/extensions/fsresource</connection>-->
+        <!--<developerConnection>scm:svn:https://svn.apache.org/repos/asf/sling/trunk/bundles/extensions/fsresource</developerConnection>-->
+        <!--<url>http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/fsresource</url>-->
+    <!--</scm>-->
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-scr-plugin</artifactId>
+                <configuration>
+                    <specVersion>1.1</specVersion>
+                </configuration>
+                <executions>
+                   <execution>
+                      <id>generate-scr-scrdescriptor</id>
+                       <goals>
+                        <goal>scr</goal>
+                       </goals>
+                   </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.sling</groupId>
+                <artifactId>maven-sling-plugin</artifactId>
+                <version>2.1.0</version>
+                <executions>
+                    <execution>
+                        <id>generate-adapter-metadata</id>
+                        <phase>process-classes</phase>
+                        <goals>
+                            <goal>generate-adapter-metadata</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <extensions>true</extensions>
+                <configuration>
+                    <instructions>
+                        <Private-Package>
+                            org.apache.sling.fsprovider.internal
+                        </Private-Package>
+                        <Import-Package>
+                            !com.eaio.*,
+			    !com.ecyrd.*,	
+                            !com.sun.java_cup.internal.*, 
+			    *
+                        </Import-Package>
+                            <Embed-Transitive>true</Embed-Transitive>
+                            <Embed-Dependency>hector-core,jaxp-ri,FastInfoset,cassandra-thrift,commons-pool,libthrift,httpclient,httpcore</Embed-Dependency>
+                    </instructions>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+    <dependencies>
+
+<dependency>
+	<groupId>com.sun.org.apache</groupId>
+	<artifactId>jaxp-ri</artifactId>
+	<version>1.4</version>
+</dependency>
+<dependency>
+	<groupId>com.sun.xml.fastinfoset</groupId>
+	<artifactId>FastInfoset</artifactId>
+	<version>1.2.8</version>
+</dependency>
+<dependency>
+	<groupId>org.apache.cassandra</groupId>
+	<artifactId>cassandra-thrift</artifactId>
+	<version>1.1.0</version>
+</dependency>
+<dependency>
+	<groupId>commons-pool</groupId>
+	<artifactId>commons-pool</artifactId>
+	<version>1.5.5</version>
+</dependency>
+ <dependency>
+	<groupId>org.apache.thrift</groupId>
+	<artifactId>libthrift</artifactId>
+	<version>0.9.0</version>
+</dependency>
+<dependency>
+        <groupId>org.apache.httpcomponents</groupId>
+        <artifactId>httpclient</artifactId>
+        <version>4.2</version>
+</dependency>
+<dependency>
+	<groupId>org.apache.httpcomponents</groupId>
+	<artifactId>httpcore</artifactId>
+	<version>4.2</version>
+</dependency>
+                        
+            
+
+        <dependency>
+            <groupId>javax.servlet</groupId>
+            <artifactId>servlet-api</artifactId>
+            <version>2.5</version>
+        </dependency>
+
+        <!--<dependency>-->
+            <!--<groupId>com.google.code.gson</groupId>-->
+            <!--<artifactId>gson</artifactId>-->
+            <!--<version>2.2.4</version>-->
+        <!--</dependency>-->
+
+        <!--<dependency>-->
+<!--<groupId>net.sf.json-lib</groupId>-->
+<!--<artifactId>json-lib</artifactId>-->
+<!--<version>2.4</version>-->
+<!--<classifier>jdk15</classifier>-->
+<!--</dependency>-->
+<!---->
+
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.api</artifactId>
+            <version>2.4.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.launchpad.integration-tests</artifactId>
+            <version>1.0.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.adapter</artifactId>
+            <version>2.0.4</version>
+        </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.core</artifactId>
+            <version>4.2.0</version>
+        </dependency>
+        <!--<dependency>-->
+            <!--<groupId>org.osgi</groupId>-->
+            <!--<artifactId>org.osgi.compendium</artifactId>-->
+            <!--<version>1.4.0</version>-->
+        <!--</dependency>-->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>adapter-annotations</artifactId>
+            <version>1.0.0</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.felix</groupId>
+            <artifactId>org.apache.felix.scr.annotations</artifactId>
+            <scope>compile</scope>
+            <version>1.9.0</version>
+        </dependency>
+
+        <!--Cassandra specific dependecies-->
+        <dependency>
+            <groupId>commons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+            <version>1.1.1</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.6</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-lang</groupId>
+            <artifactId>commons-lang</artifactId>
+            <version>2.4</version>
+        </dependency>
+        <dependency>
+            <groupId>me.prettyprint</groupId>
+            <artifactId>hector-core</artifactId>
+            <version>1.0-2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.5.11</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>1.5.11</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <version>1.3.1</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-collections</groupId>
+            <artifactId>commons-collections</artifactId>
+            <version>3.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.api</artifactId>
+            <version>2.4.2</version>
+        </dependency>
+
+
+    </dependencies>
+
+</project>
diff --git a/src/main/java/org/apache/sling/cassandra/resource/provider/CassandraConstants.java b/src/main/java/org/apache/sling/cassandra/resource/provider/CassandraConstants.java
new file mode 100644
index 0000000..c86c291
--- /dev/null
+++ b/src/main/java/org/apache/sling/cassandra/resource/provider/CassandraConstants.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.sling.cassandra.resource.provider;
+
+public class CassandraConstants {
+
+    /**
+     * The resource type used for sling resources that belongs to Cassandra.
+     */
+    public static final String CASSANDRA_RESOURCE_TYPE = "sling:cassandra";
+
+    /**
+     * Default cluster name used for Cassandra backend implementation for Sling
+     */
+    public static final String CASSANDRA_DEFAULT_CLUSTER_NAME = "sling_cassandra_cluster";
+
+
+}
diff --git a/src/main/java/org/apache/sling/cassandra/resource/provider/CassandraIterable.java b/src/main/java/org/apache/sling/cassandra/resource/provider/CassandraIterable.java
new file mode 100644
index 0000000..de53a32
--- /dev/null
+++ b/src/main/java/org/apache/sling/cassandra/resource/provider/CassandraIterable.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.cassandra.resource.provider;
+
+import org.apache.sling.api.resource.Resource;
+
+import java.util.Iterator;
+
+/**
+ * This is the Sling Cassandra Iterable wrapper class which wraps java Iterable.
+ */
+public class CassandraIterable implements Iterable {
+
+    private Iterator<Resource> iterator;
+
+    public CassandraIterable(Iterator<Resource> iterator){
+        this.iterator = iterator;
+    }
+
+    public Iterator iterator() {
+        return iterator;
+    }
+}
diff --git a/src/main/java/org/apache/sling/cassandra/resource/provider/CassandraResource.java b/src/main/java/org/apache/sling/cassandra/resource/provider/CassandraResource.java
new file mode 100644
index 0000000..483deab
--- /dev/null
+++ b/src/main/java/org/apache/sling/cassandra/resource/provider/CassandraResource.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.cassandra.resource.provider;
+
+import me.prettyprint.cassandra.model.CqlRows;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.hector.api.beans.HColumn;
+import me.prettyprint.hector.api.beans.Row;
+import me.prettyprint.hector.api.query.QueryResult;
+import org.apache.sling.adapter.annotations.Adaptable;
+import org.apache.sling.adapter.annotations.Adapter;
+import org.apache.sling.api.resource.*;
+import org.apache.sling.api.wrappers.ValueMapDecorator;
+import org.apache.sling.cassandra.resource.provider.mapper.CassandraMapperException;
+import org.apache.sling.cassandra.resource.provider.util.CassandraResourceProviderUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+@Adaptable(adaptableClass=Resource.class, adapters={
+    @Adapter({ValueMap.class})
+})
+public class CassandraResource extends AbstractResource {
+
+    private String resourcePath;
+    private String remainingPath;
+    private String columnFamilySector;
+    private CassandraResourceProvider resourceProvider;
+    private ResourceResolver resourceResolver;
+    private static Logger LOGGER = LoggerFactory.getLogger(CassandraResource.class);
+    private boolean dataLoaded = false;
+    private String resourceSuperType = "nt:supCassandra";
+    private String resourceType = "nt:casandra";
+    private String metadata = "resolutionPathInfo=json";
+    private final ValueMap valueMap;
+    private boolean isTransient = false;
+
+    static class CassandraValueMap extends ValueMapDecorator {
+        CassandraValueMap(String path) {
+            super(new HashMap<String, Object>());
+            put("path", path);
+        }
+    }
+
+    public CassandraResource(ResourceProvider resourceProvider, ResourceResolver resourceResolver, String resourcePath,ValueMap valueMap) {
+        this.resourceProvider = (CassandraResourceProvider) resourceProvider;
+        this.resourceResolver =  resourceResolver;
+        this.resourcePath = resourcePath;
+        this.remainingPath = CassandraResourceProviderUtil.getRemainingPath(resourcePath);
+        this.columnFamilySector = CassandraResourceProviderUtil.getColumnFamilySector(resourcePath);
+        this.valueMap = valueMap;
+    }
+
+     public CassandraResource(ResourceProvider resourceProvider, ResourceResolver resourceResolver, String resourcePath,ValueMap valueMap,Map<String, Object> stringObjectMap) {
+        this.resourceProvider = (CassandraResourceProvider) resourceProvider;
+        this.resourceResolver =  resourceResolver;
+        this.resourcePath = resourcePath;
+        this.remainingPath = CassandraResourceProviderUtil.getRemainingPath(resourcePath);
+        this.columnFamilySector = CassandraResourceProviderUtil.getColumnFamilySector(resourcePath);
+        this.valueMap = valueMap;
+        this.isTransient = true;
+//        this.metadata=stringObjectMap.get("metadata").toString();
+//        this.resourceType=stringObjectMap.get("resourceType").toString();
+//        this.resourceSuperType=stringObjectMap.get("resourceSuperType").toString();
+     }
+
+    private void loadResourceData(ResourceProvider resourceProvider) {
+        CassandraResourceProvider cassandraResourceProvider = (CassandraResourceProvider) resourceProvider;
+        try {
+//          TODO ColumnFamilySector is NULL and hence this..
+            String cql = cassandraResourceProvider.getCassandraMapperMap().get(columnFamilySector).getCQL(columnFamilySector, remainingPath);
+            QueryResult<CqlRows<String, String, String>> results = CassandraResourceProviderUtil.executeQuery(cql, ((CassandraResourceProvider) resourceProvider).getKeyspace(), new StringSerializer());
+            populateDataFromResult(results);
+            dataLoaded = true;
+        } catch (CassandraMapperException e) {
+            System.out.println("Error occurred from resource at " + resourcePath + " : " + e.getMessage());
+            LOGGER.error("Error occurred from resource at " + resourcePath + " : " + e.getMessage());
+        }
+    }
+
+    private void populateDataFromResult(QueryResult<CqlRows<String, String, String>> result) {
+        for (Row<String, String, String> row : result.get().getList()) {
+            for (HColumn column : row.getColumnSlice().getColumns()) {
+                //  Assumed Only one result, since key is unique
+                if (column.getName().equals("metadata")) {
+                    this.metadata = column.getValue().toString();
+                } else if (column.getName().equals("resourceSuperType")) {
+                    this.resourceSuperType = column.getValue().toString();
+                } else if (column.getName().equals("resourceType")) {
+                    this.resourceType = column.getValue().toString();
+                }
+            }
+        }
+    }
+
+    public String getPath() {
+        return resourcePath;
+    }
+
+    @Override
+    public String getName() {
+        return CassandraResourceProviderUtil.getNameFromPath(resourcePath);
+    }
+
+    @Override
+    public Resource getParent() {
+        return new CassandraResource(this.resourceProvider,
+                this.resourceResolver,
+                CassandraResourceProviderUtil.getParentPath(resourcePath),valueMap);
+    }
+
+    @Override
+    public Iterator<Resource> listChildren() {
+        List<Resource> children = new ArrayList<Resource>();
+        try {
+            QueryResult<CqlRows<String, String, String>> result = CassandraResourceProviderUtil.getAllNodes(
+                    resourceProvider.getKeyspace(),
+                    CassandraResourceProviderUtil.getColumnFamilySector(resourcePath));
+            for (Row<String, String, String> row : result.get().getList()) {
+                for (HColumn column : row.getColumnSlice().getColumns()) {
+                    if ("path".equals(column.getName()) && CassandraResourceProviderUtil.isAnImmediateChild(resourcePath, column.getValue().toString())) {
+                        children.add( new CassandraResource(resourceProvider,resourceResolver,column.getValue().toString(),valueMap));
+                    }
+                }
+            }
+        } catch (Exception e) {
+            System.out.println("Error occurred while getting child nodes " + e.getMessage());
+            LOGGER.error("Error occurred while getting child nodes " + e.getMessage());
+        }
+        return children.iterator();
+    }
+
+    @Override
+    public Iterable<Resource> getChildren() {
+        return new CassandraIterable(listChildren());
+    }
+
+    @Override
+    public Resource getChild(String s) {
+       return resourceProvider.getResource(resourceResolver,new StringBuilder(resourcePath.endsWith("/") ? resourcePath.substring(0, resourcePath.length() - 1)
+                        : resourcePath)
+                        .append("/").append(s).toString());
+//        return new CassandraResource(
+//                this.resourceProvider,
+//                this.resourceResolver,
+//                new StringBuilder(resourcePath.endsWith("/") ? resourcePath.substring(0, resourcePath.length() - 1)
+//                        : resourcePath)
+//                        .append("/").append(s).toString(),valueMap);
+    }
+
+    public String getResourceType() {
+        if(!isTransient) {
+            loadResourceData(resourceProvider);
+        }
+        return this.resourceType;
+    }
+
+    public String getResourceSuperType() {
+        if(!isTransient) {
+            loadResourceData(resourceProvider);
+        }
+        return this.resourceSuperType;
+    }
+
+    @Override
+    public boolean isResourceType(String s) {
+        return super.isResourceType(s);
+    }
+
+    public ResourceMetadata getResourceMetadata() {
+        if(!isTransient) {
+            loadResourceData(resourceProvider);
+        }
+//       Expected format of metadata is a String; i.e "characterEncoding=UTF-8,resolutionPathInfo=.html"
+        if(metadata == null || "".equals(metadata) || metadata.split(",").length == 0) {
+            ResourceMetadata resourceMetadata = new ResourceMetadata();
+            resourceMetadata.setModificationTime(System.currentTimeMillis());
+            resourceMetadata.setResolutionPath(resourcePath);
+            return resourceMetadata;
+        }
+        ResourceMetadata resourceMetadata = new ResourceMetadata();
+
+        for(String ele:metadata.split(",")) {
+            String key=ele.split("=")[0].trim();
+            String value=ele.split("=")[1].trim();
+
+          if("characterEncoding".equalsIgnoreCase(key)) {
+             resourceMetadata.setCharacterEncoding(value);
+          } else if("contentType".equalsIgnoreCase(key)) {
+              resourceMetadata.setContentType(value);
+          } else if("contentLength".equalsIgnoreCase(key)) {
+              resourceMetadata.setContentLength(Integer.valueOf(value));
+          } else if("resolutionPathInfo".equalsIgnoreCase(key)) {
+              resourceMetadata.setResolutionPathInfo(value);
+          }
+        }
+
+        resourceMetadata.setModificationTime(System.currentTimeMillis());
+        resourceMetadata.setResolutionPath(resourcePath);
+        return resourceMetadata;
+    }
+
+    public ResourceResolver getResourceResolver() {
+        return this.resourceResolver;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+  public <AdapterType> AdapterType adaptTo(Class<AdapterType> type) {
+        if(type == ValueMap.class) {
+            return (AdapterType)valueMap;
+        } else if (type == ModifiableValueMap.class) {
+            return (AdapterType)valueMap;
+        }
+        return super.adaptTo(type);
+    }
+    @Override
+    public String toString(){
+      return new StringBuilder("{\nresourcePath=").append(resourcePath).
+                  append("\n    remainingPath=").append(remainingPath).
+                  append("\n    columnFamilySector=").append(columnFamilySector).
+                  append("\n    resourceSuperType=").append(resourceSuperType).
+                  append("\n    resourceType=").append(resourceType).
+                  append("\n    metaData=").append(metadata).append("\n}").toString();
+    }
+
+}
diff --git a/src/main/java/org/apache/sling/cassandra/resource/provider/CassandraResourceProvider.java b/src/main/java/org/apache/sling/cassandra/resource/provider/CassandraResourceProvider.java
new file mode 100644
index 0000000..637a737
--- /dev/null
+++ b/src/main/java/org/apache/sling/cassandra/resource/provider/CassandraResourceProvider.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.cassandra.resource.provider;
+
+import org.apache.commons.codec.binary.Base64;
+import me.prettyprint.cassandra.model.CqlQuery;
+import me.prettyprint.cassandra.model.CqlRows;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.cassandra.service.ThriftKsDef;
+import me.prettyprint.hector.api.Cluster;
+import me.prettyprint.hector.api.Keyspace;
+import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
+import me.prettyprint.hector.api.ddl.ComparatorType;
+import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
+import me.prettyprint.hector.api.exceptions.HInvalidRequestException;
+import me.prettyprint.hector.api.factory.HFactory;
+import me.prettyprint.hector.api.query.QueryResult;
+import org.apache.felix.scr.annotations.*;
+import org.apache.sling.api.resource.*;
+import org.apache.sling.cassandra.resource.provider.mapper.CassandraMapper;
+import org.apache.sling.cassandra.resource.provider.mapper.DefaultCassandraMapperImpl;
+import org.apache.sling.cassandra.resource.provider.util.CassandraResourceProviderUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.Arrays;
+import java.util.Iterator;
+
+
+@Component(immediate = true, metatype = true)
+@Service(ResourceProvider.class)
+@Properties({
+        @Property(name = "provider.roots", value = {"/content/cassandra"})
+})
+public class CassandraResourceProvider implements ResourceProvider, ModifyingResourceProvider {
+
+    private Cluster cluster;
+    private Keyspace keyspace;
+    private String cf = "pictures";
+    private int replicationFactor = 1;
+    private final static String KEYSPACE = "sling_cassandra";
+    private String CASSANDRA_EP = "localhost:9160";
+    private ConcurrentHashMap<String, CassandraMapper> cassandraMapperMap
+            = new ConcurrentHashMap<String, CassandraMapper>();
+    private static Logger LOGGER = LoggerFactory.getLogger(CassandraResourceProvider.class);
+    private static final Map<String, ValueMap> CASSANDRA_MAP = new HashMap<String, ValueMap>();
+    private static final Map<String, Map> TRANSIENT_CREATE_MAP = new HashMap<String, Map>();
+    private static final Map<String, Map> TRANSIENT_DELETE_MAP = new HashMap<String, Map>();
+
+
+    public CassandraResourceProvider() {
+        init();
+    }
+
+    public Keyspace getKeyspace() {
+        return keyspace;
+    }
+
+    public void setColumnFamily(String cf) {
+        this.cf = cf;
+    }
+
+    private synchronized void init() {
+        System.out.println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ PROVIDER INIT");
+        cluster = HFactory.getOrCreateCluster(CassandraConstants.CASSANDRA_DEFAULT_CLUSTER_NAME, CASSANDRA_EP);
+        KeyspaceDefinition keyspaceDef = cluster.describeKeyspace(KEYSPACE);
+        if (keyspaceDef == null) {
+            try {
+                createSchema(cluster);
+            } catch (Exception ignore) {
+                System.out.println("Ignoring the exception when trying to create a already existing schema " + ignore.getMessage());
+                LOGGER.debug("Ignoring the exception when trying to create a already existing schema " + ignore.getMessage());
+            }
+        }
+        this.keyspace = HFactory.createKeyspace(KEYSPACE, cluster);
+        System.out.println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ PROVIDER INIT2");
+
+    }
+
+    public Resource getResource(ResourceResolver resourceResolver, javax.servlet.http.HttpServletRequest httpServletRequest, String s) {
+        return getResource(resourceResolver, s);
+    }
+
+    public Resource getResource(ResourceResolver resourceResolver, String s) {
+        //Populating the map of cassandra mappers.
+        if(s.startsWith("/") && !s.equals("/content/cassandra") && s.split("/").length <= 4){
+            return new CassandraResource(this, resourceResolver, s, new CassandraResource.CassandraValueMap(s),new HashMap<String, Object>());
+        }
+        System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%"+s);
+
+        if (CASSANDRA_MAP.get(s) == null) {
+            return null;
+        }
+
+        if (CassandraResourceProviderUtil.getColumnFamilySector(s) != null && !getCassandraMapperMap().containsKey(CassandraResourceProviderUtil.getColumnFamilySector(s))) {
+            getCassandraMapperMap().put(CassandraResourceProviderUtil.getColumnFamilySector(s), new DefaultCassandraMapperImpl());
+        }
+        try {
+            return new CassandraResource(this, resourceResolver, s, CASSANDRA_MAP.get(s));
+//            return CassandraResourceProviderUtil.isResourceExists(this,keyspace,s) ? new CassandraResource(this,resourceResolver, s,CASSANDRA_MAP.get(s)) : null;
+        } catch (Exception e) {
+            System.out.println("Error at Provider " + e.getMessage());
+            LOGGER.error("Error at Provider " + e.getMessage());
+            LOGGER.debug(e.getMessage());
+        }
+        return null;
+    }
+
+    public Iterator<Resource> listChildren(Resource resource) {
+        return resource.listChildren();
+    }
+
+    private void createSchema(Cluster myCluster) {
+        ColumnFamilyDefinition cfDef = HFactory.createColumnFamilyDefinition(KEYSPACE, cf, ComparatorType.BYTESTYPE);
+        KeyspaceDefinition newKeyspace = HFactory.createKeyspaceDefinition(
+                KEYSPACE,
+                ThriftKsDef.DEF_STRATEGY_CLASS,
+                replicationFactor,
+                Arrays.asList(cfDef));
+        myCluster.addKeyspace(newKeyspace, true);
+    }
+
+    public ConcurrentHashMap<String, CassandraMapper> getCassandraMapperMap() {
+        return cassandraMapperMap;
+    }
+
+    static {
+        defineCassandra("/content/cassandra/p1/c1");
+        defineCassandra("/content/cassandra/p1/c1/c2");
+        defineCassandra("/content/cassandra/nK/1");
+        loadMapWithTestData();
+    }
+
+    private static void loadMapWithTestData() {
+        String[] nodes = new String[]{"A", "B", "C", "D", "E", "F"};
+        for (String node : nodes) {
+            int exerciseCount = 0;
+            int testCount = 0;
+            for (int i = 0; i < 1000; i++) {
+                String path = "/content/cassandra/" + node + "/" + i;
+                if (i % 2 == 0) {
+                    if (exerciseCount <= 100) {
+                        defineCassandra(path);
+                        exerciseCount++;
+                    }
+                } else {
+                    if (testCount <= 100) {
+                        defineCassandra(path);
+                        testCount++;
+                    }
+                }
+            }
+
+        }
+    }
+
+
+    private static ValueMap defineCassandra(String path) {
+        final ValueMap valueMap = new CassandraResource.CassandraValueMap(path);
+        CASSANDRA_MAP.put(path, valueMap);
+        return valueMap;
+    }
+
+
+    /**
+     * Modification methods for Cassandra Resource Provider
+     */
+
+    @Override
+    public Resource create(ResourceResolver resourceResolver, String s, Map<String, Object> stringObjectMap) throws PersistenceException {
+        System.out.println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@AA3 CREATE");
+        TRANSIENT_CREATE_MAP.put(s, stringObjectMap);
+        commit(resourceResolver);
+//        return getResource(resourceResolver,s);
+        return new CassandraResource(this, resourceResolver, s, new CassandraResource.CassandraValueMap(s),stringObjectMap);
+    }
+
+    @Override
+    public void delete(ResourceResolver resourceResolver, String s) throws PersistenceException {
+        if (TRANSIENT_CREATE_MAP.get(s) != null) {
+            TRANSIENT_DELETE_MAP.put(s, TRANSIENT_CREATE_MAP.get(s));
+            TRANSIENT_CREATE_MAP.remove(s);
+        }
+
+        if (CASSANDRA_MAP.get(s) != null) {
+            TRANSIENT_DELETE_MAP.put(s, CASSANDRA_MAP.get(s));
+        }
+    }
+
+    @Override
+    public void revert(ResourceResolver resourceResolver) {
+        TRANSIENT_CREATE_MAP.clear();
+        TRANSIENT_DELETE_MAP.clear();
+    }
+
+    @Override
+    public void commit(ResourceResolver resourceResolver) throws PersistenceException {
+        for (Map.Entry<String, Map> entry : TRANSIENT_CREATE_MAP.entrySet()) {
+            if (entry.getKey() != null && entry.getValue() != null) {
+                if(insertResource(entry.getKey(), entry.getValue())){
+                    defineCassandra(entry.getKey());
+                }
+            }
+        }
+
+        for (Map.Entry<String, Map> entry : TRANSIENT_DELETE_MAP.entrySet()) {
+            if(deleteResource(resourceResolver, entry.getKey())){
+                CASSANDRA_MAP.remove(entry.getKey());
+            }
+        }
+
+    }
+
+    private boolean deleteResource(ResourceResolver resourceResolver, String path) throws PersistenceException {
+        try {
+            String key = getrowID(path);
+            if(key == null){
+            return false;
+            }
+            String _cf = CassandraResourceProviderUtil.getColumnFamilySector(path);
+            createColumnFamily(_cf, this.getKeyspace(), new StringSerializer());
+
+            StringSerializer se = new StringSerializer();
+            CqlQuery<String, String, String> cqlQuery = new CqlQuery<String, String, String>(keyspace, se, se, se);
+            String query = "delete FROM " + _cf + " where KEY = '" + key + "';";
+            cqlQuery.setQuery(query);
+            QueryResult<CqlRows<String, String, String>> result = cqlQuery.execute();
+        } catch (NoSuchAlgorithmException e) {
+            throw new PersistenceException(e.getMessage());
+        } catch (UnsupportedEncodingException e) {
+            throw new PersistenceException(e.getMessage());
+        }
+        return true;
+
+    }
+
+    private boolean insertResource(String path, Map map) throws PersistenceException {
+        try {
+            String r = getrowID(path);
+            if(r == null){
+             return false;
+            }
+            String _cf = CassandraResourceProviderUtil.getColumnFamilySector(path);
+            createColumnFamily(_cf, this.getKeyspace(), new StringSerializer());
+            String metadata = map.get("metadata") == null? "resolutionPathInfo=json":(String)map.get("metadata");
+            String resourceType =  map.get("resourceType") == null?"nt:cassandra":(String)map.get("resourceType");
+            String resourceSuperType =  map.get("resourceSuperType") == null?"nt:superCassandra":(String) map.get("resourceSuperType");
+
+            addData(this.getKeyspace(), _cf, new StringSerializer(),
+                    new String[]{
+                            "'" + r + "','" + path + "','" + resourceType + "','" + resourceSuperType + "','" + metadata + "'",
+                    });
+        } catch (NoSuchAlgorithmException e) {
+            throw new PersistenceException(e.getMessage());
+        } catch (UnsupportedEncodingException e) {
+            throw new PersistenceException(e.getMessage());
+        }
+      return true;
+    }
+
+    private void addData(Keyspace keyspace, String cf, StringSerializer se, String[] dataArray) {
+        for (String data : dataArray) {
+            CqlQuery<String, String, String> cqlQuery = new CqlQuery<String, String, String>(keyspace, se, se, se);
+            String query = "insert into " + cf + " (KEY,path,resourceType,resourceSuperType,metadata) values (" + data + ");";
+            cqlQuery.setQuery(query);
+            QueryResult<CqlRows<String, String, String>> result = cqlQuery.execute();
+            LOGGER.info("Added " + result.toString());
+        }
+    }
+
+    private void createColumnFamily(String cf, Keyspace keyspace, StringSerializer se) {
+        String createCF = "CREATE COLUMNFAMILY " + cf + " (KEY varchar PRIMARY KEY,path varchar,resourceType varchar,resourceSuperType varchar,metadata varchar);";
+        CqlQuery<String, String, String> cqlQuery = new CqlQuery<String, String, String>(keyspace, se, se, se);
+        cqlQuery.setQuery(createCF);
+        try{
+        QueryResult<CqlRows<String, String, String>> result1 = cqlQuery.execute();
+        LOGGER.info(result1.get().getList().size() + " Finished.!");
+        }catch(HInvalidRequestException ignore){
+            LOGGER.debug("Column Family already exists " + ignore.getMessage());
+        }
+    }
+
+    private String getrowID(String path) throws NoSuchAlgorithmException, UnsupportedEncodingException {
+        MessageDigest md = MessageDigest.getInstance("SHA1");
+        String remPath = CassandraResourceProviderUtil.getRemainingPath(path);
+        if(remPath == null ) {
+        return null;
+        } else {
+        String rowID = new String(Base64.encodeBase64(md.digest(remPath.getBytes("UTF-8"))));
+        return rowID;
+        }
+    }
+
+    @Override
+    public boolean hasChanges(ResourceResolver resourceResolver) {
+        if (TRANSIENT_CREATE_MAP.isEmpty() && TRANSIENT_DELETE_MAP.isEmpty()) {
+            return false;
+        } else {
+            return true;
+        }
+    }
+}
diff --git a/src/main/java/org/apache/sling/cassandra/resource/provider/CassandraResourceResolver.java b/src/main/java/org/apache/sling/cassandra/resource/provider/CassandraResourceResolver.java
new file mode 100644
index 0000000..46b1834
--- /dev/null
+++ b/src/main/java/org/apache/sling/cassandra/resource/provider/CassandraResourceResolver.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.cassandra.resource.provider;
+
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+
+import javax.servlet.http.HttpServletRequest;
+import java.util.Iterator;
+import java.util.Map;
+
+public class CassandraResourceResolver implements ResourceResolver {
+
+    public Resource resolve(HttpServletRequest httpServletRequest, String s) {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public Resource resolve(String s) {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public Resource resolve(HttpServletRequest httpServletRequest) {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public String map(String s) {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public String map(HttpServletRequest httpServletRequest, String s) {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public Resource getResource(String s) {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public Resource getResource(Resource resource, String s) {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public String[] getSearchPath() {
+        return new String[0];  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public Iterator<Resource> listChildren(Resource resource) {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public Iterable<Resource> getChildren(Resource resource) {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public Iterator<Resource> findResources(String s, String s1) {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public Iterator<Map<String, Object>> queryResources(String s, String s1) {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public ResourceResolver clone(Map<String, Object> stringObjectMap) throws LoginException {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public boolean isLive() {
+        return false;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void close() {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public String getUserID() {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public Iterator<String> getAttributeNames() {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public Object getAttribute(String s) {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void delete(Resource resource) throws PersistenceException {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public Resource create(Resource resource, String s, Map<String, Object> stringObjectMap) throws PersistenceException {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void revert() {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void commit() throws PersistenceException {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public boolean hasChanges() {
+        return false;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public String getParentResourceType(Resource resource) {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public String getParentResourceType(String s) {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public boolean isResourceType(Resource resource, String s) {
+        return false;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void refresh() {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public <AdapterType> AdapterType adaptTo(Class<AdapterType> adapterTypeClass) {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+}
diff --git a/src/main/java/org/apache/sling/cassandra/resource/provider/mapper/CassandraMapper.java b/src/main/java/org/apache/sling/cassandra/resource/provider/mapper/CassandraMapper.java
new file mode 100644
index 0000000..d1f30d5
--- /dev/null
+++ b/src/main/java/org/apache/sling/cassandra/resource/provider/mapper/CassandraMapper.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.cassandra.resource.provider.mapper;
+
+/**
+ * This interface provides extensibility to implement any Cassandra Mapper to return a given CQL query
+ */
+public interface CassandraMapper {
+   /**
+     *
+     * @param columnFamilySelector - Corresponding cassandra column family name
+     * @param path - Absolute path to the sling resource
+     * @return - String value of the appropriate CQL based on columnFamilySelector and path
+     */
+    String getCQL(String columnFamilySelector, String path) throws CassandraMapperException;
+}
diff --git a/src/main/java/org/apache/sling/cassandra/resource/provider/mapper/CassandraMapperException.java b/src/main/java/org/apache/sling/cassandra/resource/provider/mapper/CassandraMapperException.java
new file mode 100644
index 0000000..bbd3d65
--- /dev/null
+++ b/src/main/java/org/apache/sling/cassandra/resource/provider/mapper/CassandraMapperException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.cassandra.resource.provider.mapper;
+
+public class CassandraMapperException extends Exception {
+
+    public CassandraMapperException() {
+        super();
+    }
+
+    public CassandraMapperException(String s) {
+        super(s);
+    }
+
+    public CassandraMapperException(java.lang.String s, java.lang.Throwable throwable) {
+        super(s, throwable);
+    }
+
+    public CassandraMapperException(java.lang.Throwable throwable) {
+        super(throwable);
+    }
+
+
+}
diff --git a/src/main/java/org/apache/sling/cassandra/resource/provider/mapper/DefaultCassandraMapperImpl.java b/src/main/java/org/apache/sling/cassandra/resource/provider/mapper/DefaultCassandraMapperImpl.java
new file mode 100644
index 0000000..f6e1f90
--- /dev/null
+++ b/src/main/java/org/apache/sling/cassandra/resource/provider/mapper/DefaultCassandraMapperImpl.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.cassandra.resource.provider.mapper;
+
+
+import org.apache.commons.codec.binary.Base64;
+
+import java.io.UnsupportedEncodingException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+/**
+ *  This class generates a unique key for a given path and that key is used as the primary key when the particular resource stored in Cassandra.
+ *  Key generates by hashing the UTF-8 bytes of the path string with SHA1 function.
+ */
+public class DefaultCassandraMapperImpl implements CassandraMapper {
+
+    public String getCQL(String columnFamilySelector, String path) throws CassandraMapperException {
+        MessageDigest md = null;
+        try {
+            md = MessageDigest.getInstance("SHA1");
+            String rowID = new String(Base64.encodeBase64(md.digest(path.getBytes("UTF-8"))));
+            return "select * from "+columnFamilySelector+" where KEY = '"+rowID+"'";
+        } catch (NoSuchAlgorithmException e) {
+            throw  new CassandraMapperException(e.getMessage());
+        } catch (UnsupportedEncodingException e) {
+            throw  new CassandraMapperException(e.getMessage());
+        }
+    }
+}
diff --git a/src/main/java/org/apache/sling/cassandra/resource/provider/security/AccessControlUtil.java b/src/main/java/org/apache/sling/cassandra/resource/provider/security/AccessControlUtil.java
new file mode 100644
index 0000000..6136325
--- /dev/null
+++ b/src/main/java/org/apache/sling/cassandra/resource/provider/security/AccessControlUtil.java
@@ -0,0 +1,221 @@
+package org.apache.sling.cassandra.resource.provider.security;
+
+import org.apache.commons.codec.binary.Base64;
+import me.prettyprint.cassandra.model.CqlQuery;
+import me.prettyprint.cassandra.model.CqlRows;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.hector.api.Keyspace;
+import me.prettyprint.hector.api.beans.HColumn;
+import me.prettyprint.hector.api.beans.Row;
+import me.prettyprint.hector.api.exceptions.HInvalidRequestException;
+import me.prettyprint.hector.api.query.QueryResult;
+import org.apache.sling.cassandra.resource.provider.CassandraResourceProvider;
+import org.apache.sling.cassandra.resource.provider.util.CassandraResourceProviderUtil;
+import org.apache.sling.commons.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.UnsupportedEncodingException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+public class AccessControlUtil {
+    private static Logger LOGGER = LoggerFactory.getLogger(AccessControlUtil.class);
+
+    private static final String ACL_CF = "sling_acl";
+    private static final String ACE_SEPARATOR = "_";
+
+   private static final int  READ = 0x01;
+   private static final  int  WRITE = 0x02;
+   private static final  int  DELETE = 0x04;
+    private static final String  _READ = "READ";
+    private static final String  _WRITE = "WRITE";
+    private static final  String  _DELETE = "DELETE";
+    private static final String DEFAULT_SYSTEM_ACE="0_everyone_allow : 0x01";
+
+    private CassandraResourceProvider provider;
+
+
+    public AccessControlUtil(CassandraResourceProvider provider) {
+        this.provider = provider;
+    }
+
+    public AccessControlUtil() {
+    }
+
+    public static void main(String[] args) throws Exception {
+
+            m();
+    }
+
+
+    private static void m() {
+
+       AccessControlUtil accessControlUtil =  new AccessControlUtil(new CassandraResourceProvider());
+
+        String path = "/content/cassandra/p2/c2";
+        String policy="0_dishara_allow : " + READ;
+        Set<String> set  = new HashSet<String>();
+        set.add("dishara");
+        try {
+
+        accessControlUtil.addACE(path,policy);
+
+          int results[] = accessControlUtil.buildAtLevel(set,path);
+            System.out.println("GRANT" +results[0]);
+            System.out.println("DENY" + results[1]);
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    private int getPrivilegeFromACE(String ace) {
+     return Integer.decode(ace.split(":")[1].trim());
+    }
+
+    private String getPrincipleFromACE(String ace) {
+        return ace.split(":")[0].trim().split(ACE_SEPARATOR)[1].trim();
+     }
+
+    private boolean isACEGrant(String ace) {
+        return "allow".equalsIgnoreCase(ace.split(":")[0].trim().split(ACE_SEPARATOR)[2].trim());
+    }
+
+    private boolean isUserHasPrinciple(String principle,Set<String> principles){
+        return principles.contains(principle);
+    }
+
+    private int[] getCurrentLevelBitmaps(Set<String> userPrinciples, String currentPath) throws Exception {
+        int grants = 0;
+        int denies = 0;
+
+        String[] aclArray = getACL(currentPath);
+        if(aclArray == null){
+           return new int[]{grants,denies};
+        }
+        for (String ace : aclArray) {
+            if (isUserHasPrinciple(getPrincipleFromACE(ace),userPrinciples)){
+                int toGrant = 0;
+                int toDeny = 0;
+                if (isACEGrant(ace)) {
+                    toGrant = getPrivilegeFromACE(ace);
+                } else {
+                    toDeny = getPrivilegeFromACE(ace);
+                }
+                toGrant = toGrant & ~denies;
+                toDeny = toDeny & ~grants;
+                grants = grants | toGrant;
+                denies = denies | toDeny;
+            }
+        }
+        return new int[]{grants, denies};
+    }
+
+    private int[] buildAtLevel(Set<String> userPrinciples, String path) throws Exception {
+        int[] parentPermission = new int[]{0,0};
+        int[] permissions = new int[]{0,0};
+        if (!"/".equals(path)) {
+            parentPermission = buildAtLevel(userPrinciples, CassandraResourceProviderUtil.getParentPath(path));
+        }
+        permissions = getCurrentLevelBitmaps(userPrinciples, path);
+        int toGrant = parentPermission[0] & ~permissions[1];
+        int toDeny = parentPermission[1] & ~permissions[0];
+        permissions[0] = permissions[0] | toGrant;
+        permissions[1] = permissions[1] | toDeny;
+        return permissions;
+    }
+
+
+    private void addACE(String path, String policy) throws Exception {
+        String rid = getrowID(path);
+        createColumnFamily(ACL_CF, provider.getKeyspace(), new StringSerializer());
+        String getAllACEs = "select * from " + ACL_CF + " where KEY = '" + rid + "'";
+        QueryResult<CqlRows<String, String, String>> results = CassandraResourceProviderUtil.executeQuery(getAllACEs, provider.getKeyspace(), new StringSerializer());
+        if (CassandraResourceProviderUtil.recordExists(results, "policy")) {
+            updateACL(rid, policy, new StringSerializer(), results);
+        } else {
+            addACL(rid, policy, new StringSerializer());
+        }
+    }
+
+
+    private String[] getACL(String path) throws Exception {
+        if(getCassandraSystemNodeACL(path) != null){
+         return getCassandraSystemNodeACL(path);
+        }
+        String rid = getrowID(path);
+        createColumnFamily(ACL_CF, provider.getKeyspace(), new StringSerializer());
+        String getAllACEs = "select * from " + ACL_CF + " where KEY = '" + rid + "'";
+        QueryResult<CqlRows<String, String, String>> results = CassandraResourceProviderUtil.executeQuery(getAllACEs, provider.getKeyspace(), new StringSerializer());
+        String policy = null;
+        for (Row<String, String, String> row : ((CqlRows<String, String, String>) results.get()).getList()) {
+            for (HColumn column : row.getColumnSlice().getColumns()) {
+                if ("policy".equalsIgnoreCase(column.getName().toString()) && column.getValue() != null) {
+                    policy = column.getValue().toString();
+                }
+            }
+        }
+        return policy != null ? policy.split(";") : null;
+    }
+
+    private String[] getCassandraSystemNodeACL(String path){
+        if("/content".equals(path) || "/content/cassandra".equals(path) || "/".equals(path))  {
+         return new String[]{DEFAULT_SYSTEM_ACE};
+        } else {
+            return null;
+        }
+    }
+
+
+
+    private void updateACL(String rid, String policy, StringSerializer se, QueryResult<CqlRows<String, String, String>> results) {
+        String oldACL = "";
+        for (Row<String, String, String> row : ((CqlRows<String, String, String>) results.get()).getList()) {
+            for (HColumn column : row.getColumnSlice().getColumns()) {
+                if ("policy".equalsIgnoreCase(column.getName().toString()) && column.getValue() != null) {
+                    oldACL = column.getValue().toString();
+                }
+            }
+        }
+
+        if (!oldACL.isEmpty()) {
+            oldACL = oldACL + ";" + policy;
+        }
+
+        addACL(rid, oldACL, new StringSerializer());
+
+    }
+
+    private void addACL(String rid, String policy, StringSerializer se) {
+        CqlQuery<String, String, String> cqlQuery = new CqlQuery<String, String, String>(provider.getKeyspace(), se, se, se);
+        String data = "'" + rid + "'" + "," + "'" + policy + "'";
+        String query = "insert into " + ACL_CF + " (KEY,policy) values (" + data + ");";
+        cqlQuery.setQuery(query);
+        QueryResult<CqlRows<String, String, String>> result = cqlQuery.execute();
+        System.out.println("#####Added ACL");
+    }
+
+    private String getrowID(String path) throws NoSuchAlgorithmException, UnsupportedEncodingException {
+        MessageDigest md = MessageDigest.getInstance("SHA1");
+        String rowID = new String(Base64.encodeBase64(md.digest(CassandraResourceProviderUtil.getRemainingPath(path) != null?CassandraResourceProviderUtil.getRemainingPath(path).getBytes("UTF-8"):"TEST".getBytes())));
+        return rowID;
+    }
+
+    private void createColumnFamily(String cf, Keyspace keyspace, StringSerializer se) {
+        String createCF = "CREATE COLUMNFAMILY " + cf + " (KEY varchar PRIMARY KEY,policy varchar);";
+        CqlQuery<String, String, String> cqlQuery = new CqlQuery<String, String, String>(keyspace, se, se, se);
+        cqlQuery.setQuery(createCF);
+        try {
+            QueryResult<CqlRows<String, String, String>> result1 = cqlQuery.execute();
+            LOGGER.info(result1.get().getList().size() + " Finished.!");
+        } catch (HInvalidRequestException ignore) {
+            LOGGER.debug("Column Family already exists " + ignore.getMessage());
+        }
+    }
+
+
+}
diff --git a/src/main/java/org/apache/sling/cassandra/resource/provider/util/CassandraResourceProviderUtil.java b/src/main/java/org/apache/sling/cassandra/resource/provider/util/CassandraResourceProviderUtil.java
new file mode 100644
index 0000000..80ec67a
--- /dev/null
+++ b/src/main/java/org/apache/sling/cassandra/resource/provider/util/CassandraResourceProviderUtil.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.cassandra.resource.provider.util;
+
+import org.apache.commons.codec.binary.Base64;
+import me.prettyprint.cassandra.model.CqlQuery;
+import me.prettyprint.cassandra.model.CqlRows;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.hector.api.Keyspace;
+import me.prettyprint.hector.api.beans.HColumn;
+import me.prettyprint.hector.api.beans.Row;
+import me.prettyprint.hector.api.query.QueryResult;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.cassandra.resource.provider.CassandraResourceProvider;
+import java.io.UnsupportedEncodingException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CassandraResourceProviderUtil {
+
+    private static Logger LOGGER = LoggerFactory.getLogger(CassandraResourceProviderUtil.class);
+
+    public ResourceResolver getResourceResolver() {
+        return resourceResolver;
+    }
+
+    @Reference
+    private ResourceResolver resourceResolver;
+
+
+    public static QueryResult<CqlRows<String, String, String>> executeQuery(String query, Keyspace keyspace, StringSerializer se) {
+//        path, resourceType,resourceSuperType,metadata
+        CqlQuery<String, String, String> cqlQuery = new CqlQuery<String, String, String>(keyspace, se, se, se);
+        cqlQuery.setQuery(query);
+        return cqlQuery.execute();
+    }
+
+
+    public static String getRemainingPath(String path) {
+        if (path.startsWith("/") && path.split("/").length > 4) {
+            return path.substring(new StringBuilder("/").
+                    append(path.split("/")[1]).
+                    append("/").
+                    append(path.split("/")[2]).
+                    append("/").
+                    append(path.split("/")[3]).
+                    toString().length() + 1,
+                    path.length());
+        } else {
+            return null;
+        }
+    }
+
+    public static String getColumnFamilySector(String path) {
+        if (path.startsWith("/") && path.split("/").length > 4) {
+            return path.split("/")[3];
+        } else {
+            return null;
+        }
+    }
+
+    public static void logResults(QueryResult<CqlRows<String, String, String>> result) {
+        LOGGER.info("############# RESULT ################");
+        for (Row<String, String, String> row : ((CqlRows<String, String, String>) result.get()).getList()) {
+            for (HColumn column : row.getColumnSlice().getColumns()) {
+                LOGGER.info(column.getValue().toString());
+            }
+            LOGGER.info("------------------------------------------------------------------------------------");
+        }
+        LOGGER.info("############# RESULT ################");
+    }
+
+    public static boolean recordExists(QueryResult<CqlRows<String, String, String>> result,String key) {
+        boolean exists = false;
+        for (Row<String, String, String> row : (result.get()).getList()) {
+            for (HColumn column : row.getColumnSlice().getColumns()) {
+             if(column.getName().equals(key) && column.getValue() != null) {
+                 exists = true;
+                 break;
+             }
+            }
+        }
+        return exists;
+    }
+
+    public static boolean isResourceExists(CassandraResourceProvider resourceProvider, Keyspace keyspace, String path) throws Exception {
+        String cql = resourceProvider.getCassandraMapperMap().get(CassandraResourceProviderUtil.getColumnFamilySector(path)).getCQL(CassandraResourceProviderUtil.getColumnFamilySector(path), CassandraResourceProviderUtil.getRemainingPath(path));
+        QueryResult<CqlRows<String, String, String>> result = CassandraResourceProviderUtil.executeQuery(cql, keyspace, new StringSerializer());
+        return recordExists(result,"policy");
+    }
+
+    public static String getrowID(String path) throws NoSuchAlgorithmException, UnsupportedEncodingException {
+        MessageDigest md = MessageDigest.getInstance("SHA1");
+        String rowID = new String(Base64.encodeBase64(md.digest(getRemainingPath(path).getBytes("UTF-8"))));
+        return rowID;
+    }
+
+    public static String getParentPath(String path) {
+        if(path.lastIndexOf("/") == 0){
+         return "/";
+        }
+        return path.substring(0,path.lastIndexOf("/"));
+    }
+
+    public static String getNameFromPath(String path){
+    return path.substring(path.lastIndexOf("/")+1);
+    }
+
+
+    public static QueryResult<CqlRows<String, String, String>> getAllNodes(Keyspace keyspace, String cf) throws Exception {
+        String cql = "select * from " + cf;
+        return CassandraResourceProviderUtil.executeQuery(cql, keyspace, new StringSerializer());
+    }
+
+
+    public static boolean isAnImmediateChild(String path,String s){
+         if(s.length() > path.length()) {
+            return !s.substring(path.length() + 1).contains("/");
+         }
+        return false;
+    }
+
+  //    row.getColumnSlice().getColumnByName("metadata").getValue()
+
+}
diff --git a/src/test/java/org/apache/sling/cassandra/test/data/populator/CassandraDataAddLoadTest.java b/src/test/java/org/apache/sling/cassandra/test/data/populator/CassandraDataAddLoadTest.java
new file mode 100644
index 0000000..0c54898
--- /dev/null
+++ b/src/test/java/org/apache/sling/cassandra/test/data/populator/CassandraDataAddLoadTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.cassandra.test.data.populator;
+
+
+import org.apache.commons.codec.binary.Base64;
+import me.prettyprint.cassandra.model.CqlQuery;
+import me.prettyprint.cassandra.model.CqlRows;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.hector.api.Keyspace;
+import me.prettyprint.hector.api.exceptions.HInvalidRequestException;
+import me.prettyprint.hector.api.query.QueryResult;
+import org.apache.sling.cassandra.resource.provider.CassandraResource;
+import org.apache.sling.cassandra.resource.provider.CassandraResourceProvider;
+import org.apache.sling.cassandra.resource.provider.CassandraResourceResolver;
+import org.apache.sling.cassandra.resource.provider.util.CassandraResourceProviderUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Can build a test profile that adds
+ * 1K,10K,100K,1M items to Cassandra, each under 1 parent collection.
+ * eg
+ * content/cassandra/1K/0   to /content/cassandra/1K/999
+ * content/cassandra/10K/0   to /content/cassandra/10K/9999
+ * content/cassandra/100K/0   to /content/cassandra/100K/99999
+ * content/cassandra/1M/0   to /content/cassandra/1M/999999
+ * <p/>
+ * To add 1000 nodes as content/cassandra/1K/0 to /content/cassandra/1K/999
+ * set parentPath = "/content/cassandra/";
+ * set count = 1000
+ * set cf="1K"
+ * And run the test.
+ */
+
+public class CassandraDataAddLoadTest {
+    private static Logger LOGGER = LoggerFactory.getLogger(CassandraDataAddLoadTest.class);
+    private int count = 10;
+    private String[] cfs = new String[]{"A1", "B1", "C1", "D1"};
+    private int[] sizes = new int[]{10, 100, 1000, 10000};
+
+    private String parentPath = "/content/cassandra/";
+
+    public static void main(String[] args) {
+
+        String path="/content/cassandra/foo";
+       if(path.startsWith("/") && path.split("/").length > 4){
+           System.out.println(path.split("/").length);
+       } else {
+           System.out.println(">> "+path.split("/").length);
+       }
+//        new CassandraDataAddLoadTest().testAddLoadTestData();
+    }
+
+
+    public void testAddLoadTestData() {
+        try {
+
+            for (int k = 0; k < sizes.length; k++) {
+                CassandraResourceProvider cassandraResourceProvider = new CassandraResourceProvider();
+                createColumnFamily(cfs[k], cassandraResourceProvider.getKeyspace(), new StringSerializer());
+                cassandraResourceProvider.setColumnFamily(cfs[k]);
+                CassandraResourceResolver resolver = new CassandraResourceResolver();
+                for (int i = 0; i < sizes[k]; i++) {
+                    String path = parentPath + cfs[k] + "/" + i;
+                    Map<String, Object> map1 = new HashMap<String, Object>();
+                    map1.put("metadata", "resolutionPathInfo=json");
+                    map1.put("resourceType", "nt:cassandra0");
+                    map1.put("resourceSuperType", "nt:supercass1");
+                    cassandraResourceProvider.create(resolver, path, map1);
+                    cassandraResourceProvider.commit(resolver);
+                    System.out.println(">>" + path);
+                }
+            }
+        } catch (Exception e) {
+            LOGGER.info("Ignore err" + e.getMessage());
+            Assert.fail("Failed to add data to cassandra");
+        }
+
+    }
+
+    private void createColumnFamily(String cf, Keyspace keyspace, StringSerializer se) {
+        String createCF = "CREATE COLUMNFAMILY " + cf + " (KEY varchar PRIMARY KEY,path varchar,resourceType varchar,resourceSuperType varchar,metadata varchar);";
+        CqlQuery<String, String, String> cqlQuery = new CqlQuery<String, String, String>(keyspace, se, se, se);
+        cqlQuery.setQuery(createCF);
+        try {
+            QueryResult<CqlRows<String, String, String>> result1 = cqlQuery.execute();
+            LOGGER.info(result1.get().getList().size() + " Finished.!");
+        } catch (HInvalidRequestException ignore) {
+            LOGGER.debug("Column Family already exists " + ignore.getMessage());
+        }
+    }
+
+}
+
diff --git a/src/test/java/org/apache/sling/cassandra/test/data/populator/CassandraDataAddTest.java b/src/test/java/org/apache/sling/cassandra/test/data/populator/CassandraDataAddTest.java
new file mode 100644
index 0000000..af3b032
--- /dev/null
+++ b/src/test/java/org/apache/sling/cassandra/test/data/populator/CassandraDataAddTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.cassandra.test.data.populator;
+
+
+import org.apache.commons.codec.binary.Base64;
+import org.junit.Assert;
+import me.prettyprint.cassandra.model.CqlQuery;
+import me.prettyprint.cassandra.model.CqlRows;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.hector.api.Keyspace;
+import me.prettyprint.hector.api.exceptions.HInvalidRequestException;
+import me.prettyprint.hector.api.query.QueryResult;
+import org.apache.sling.cassandra.resource.provider.CassandraResourceProvider;
+import org.apache.sling.cassandra.resource.provider.util.CassandraResourceProviderUtil;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.UnsupportedEncodingException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+public class CassandraDataAddTest {
+    private static Logger LOGGER = LoggerFactory.getLogger(CassandraDataAddTest.class);
+
+    @Test
+    public void testAddData() {
+        String cf = "p1";
+        try {
+            String r1 = getrowID("/content/cassandra/" + cf + "/c1");
+            String r2 = getrowID("/content/cassandra/" + cf + "/c1/c2");
+            CassandraResourceProvider cassandraResourceProvider = new CassandraResourceProvider();
+            createColumnFamily(cf, cassandraResourceProvider.getKeyspace(), new StringSerializer());
+            cassandraResourceProvider.setColumnFamily(cf);
+
+            addData(cassandraResourceProvider.getKeyspace(), cf, new StringSerializer(),
+                    new String[]{
+                            "'" + r1 + "','/content/cassandra/" + cf + "/c1','nt:cassandra1','nt:supercass1','resolutionPathInfo=json'",
+                            "'" + r2 + "','/content/cassandra/" + cf + "/c1/c2','nt:cassandra','nt:supercass2','resolutionPathInfo=json'"
+                    });
+
+        Assert.assertTrue(true);
+        } catch (Exception e) {
+            LOGGER.info("Ignore err" + e.getMessage());
+            Assert.fail("Failed to add data to cassandra");
+        }
+
+    }
+
+    private String getrowID(String path) throws NoSuchAlgorithmException, UnsupportedEncodingException {
+        MessageDigest md = MessageDigest.getInstance("SHA1");
+        String rowID = new String(Base64.encodeBase64(md.digest(CassandraResourceProviderUtil.getRemainingPath(path).getBytes("UTF-8"))));
+        return rowID;
+    }
+
+    private void createColumnFamily(String cf, Keyspace keyspace, StringSerializer se) {
+        String createCF = "CREATE COLUMNFAMILY " + cf + " (KEY varchar PRIMARY KEY,path varchar,resourceType varchar,resourceSuperType varchar,metadata varchar);";
+        CqlQuery<String, String, String> cqlQuery = new CqlQuery<String, String, String>(keyspace, se, se, se);
+        cqlQuery.setQuery(createCF);
+        try{
+        QueryResult<CqlRows<String, String, String>> result1 = cqlQuery.execute();
+        LOGGER.info(result1.get().getList().size() + " Finished.!");
+        }catch(HInvalidRequestException ignore){
+            LOGGER.debug("Column Family already exists " + ignore.getMessage());
+        }
+    }
+
+    private void addData(Keyspace keyspace, String cf, StringSerializer se, String[] dataArray) {
+        for (String data : dataArray) {
+            CqlQuery<String, String, String> cqlQuery = new CqlQuery<String, String, String>(keyspace, se, se, se);
+            String query = "insert into " + cf + " (KEY,path,resourceType,resourceSuperType,metadata) values (" + data + ");";
+            cqlQuery.setQuery(query);
+            QueryResult<CqlRows<String, String, String>> result = cqlQuery.execute();
+            LOGGER.info("Added " + result.toString());
+        }
+    }
+}
+
diff --git a/src/test/java/org/apache/sling/cassandra/test/data/populator/CassandraDataChildNodeIterableTest.java b/src/test/java/org/apache/sling/cassandra/test/data/populator/CassandraDataChildNodeIterableTest.java
new file mode 100644
index 0000000..bda9e3d
--- /dev/null
+++ b/src/test/java/org/apache/sling/cassandra/test/data/populator/CassandraDataChildNodeIterableTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.cassandra.test.data.populator;
+
+
+import org.apache.commons.codec.binary.Base64;
+import org.junit.Test;
+import org.junit.Assert;
+import me.prettyprint.cassandra.model.CqlQuery;
+import me.prettyprint.cassandra.model.CqlRows;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.hector.api.Keyspace;
+import me.prettyprint.hector.api.exceptions.HInvalidRequestException;
+import me.prettyprint.hector.api.query.QueryResult;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.cassandra.resource.provider.CassandraResourceProvider;
+import org.apache.sling.cassandra.resource.provider.CassandraResourceResolver;
+import org.apache.sling.cassandra.resource.provider.util.CassandraResourceProviderUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Iterator;
+
+public class CassandraDataChildNodeIterableTest {
+    private static Logger LOGGER = LoggerFactory.getLogger(CassandraDataChildNodeIterableTest.class);
+
+    @Test
+    public void testGetChildrenData() {
+        String cf = "p1";
+        try {
+            String r1 = getrowID("/content/cassandra/" + cf + "/c1");
+            String r2 = getrowID("/content/cassandra/" + cf + "/c1/c2");
+            CassandraResourceProvider cassandraResourceProvider = new CassandraResourceProvider();
+            createColumnFamily(cf, cassandraResourceProvider.getKeyspace(), new StringSerializer());
+            cassandraResourceProvider.setColumnFamily(cf);
+
+          addData(cassandraResourceProvider.getKeyspace(), cf, new StringSerializer(),
+                    new String[]{
+                            "'" + r1 + "','/content/cassandra/" + cf + "/c1','nt:cassandra1','nt:supercass1','resolutionPathInfo=json'",
+                            "'" + r2 + "','/content/cassandra/" + cf + "/c1/c2','nt:cassandra','nt:supercass2','resolutionPathInfo=json'"
+                    });
+
+            getChildrenData(cassandraResourceProvider, cf);
+       } catch (Exception e) {
+            LOGGER.info("Ignore err" + e.getMessage());
+       }
+    }
+
+    private void getChildrenData(CassandraResourceProvider cassandraResourceProvider, String cf) {
+        Resource resource = cassandraResourceProvider.getResource(new CassandraResourceResolver(), "/content/cassandra/" + cf + "/c1");
+        Assert.assertNotNull(resource);
+        Iterable<Resource> iterableChildren = resource.getChildren();
+        Iterator<Resource> children = iterableChildren.iterator();
+        boolean hasChild = false;
+        while(children.hasNext()){
+            Resource r = children.next();
+            if(r.getPath().equals("/content/cassandra/" + cf + "/c1/c2")) {
+            Assert.assertNotNull(r);
+            Assert.assertEquals("/content/cassandra/" + cf + "/c1/c2",r.getPath());
+            hasChild = true;
+            }
+        }
+        Assert.assertTrue(hasChild);
+    }
+
+    private String getrowID(String path) throws NoSuchAlgorithmException, UnsupportedEncodingException {
+        MessageDigest md = MessageDigest.getInstance("SHA1");
+        String rowID = new String(Base64.encodeBase64(md.digest(CassandraResourceProviderUtil.getRemainingPath(path).getBytes("UTF-8"))));
+        return rowID;
+    }
+
+    private void createColumnFamily(String cf, Keyspace keyspace, StringSerializer se) {
+        String createCF = "CREATE COLUMNFAMILY " + cf + " (KEY varchar PRIMARY KEY,path varchar,resourceType varchar,resourceSuperType varchar,metadata varchar);";
+        CqlQuery<String, String, String> cqlQuery = new CqlQuery<String, String, String>(keyspace, se, se, se);
+        cqlQuery.setQuery(createCF);
+        try{
+        QueryResult<CqlRows<String, String, String>> result1 = cqlQuery.execute();
+        LOGGER.info(result1.get().getList().size() + " Finished.!");
+        }catch(HInvalidRequestException ignore){
+            LOGGER.debug("Column Family already exists "+ignore.getMessage());
+        }
+    }
+
+    private void addData(Keyspace keyspace, String cf, StringSerializer se, String[] dataArray) {
+        for (String data : dataArray) {
+            CqlQuery<String, String, String> cqlQuery = new CqlQuery<String, String, String>(keyspace, se, se, se);
+            String query = "insert into " + cf + " (KEY,path,resourceType,resourceSuperType,metadata) values (" + data + ");";
+            cqlQuery.setQuery(query);
+            cqlQuery.execute();
+        }
+    }
+}
+
diff --git a/src/test/java/org/apache/sling/cassandra/test/data/populator/CassandraDataChildNodeTest.java b/src/test/java/org/apache/sling/cassandra/test/data/populator/CassandraDataChildNodeTest.java
new file mode 100644
index 0000000..73cbffd
--- /dev/null
+++ b/src/test/java/org/apache/sling/cassandra/test/data/populator/CassandraDataChildNodeTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.cassandra.test.data.populator;
+
+
+import org.apache.commons.codec.binary.Base64;
+import org.junit.Assert;
+import org.junit.Test;
+import me.prettyprint.cassandra.model.CqlQuery;
+import me.prettyprint.cassandra.model.CqlRows;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.hector.api.Keyspace;
+import me.prettyprint.hector.api.exceptions.HInvalidRequestException;
+import me.prettyprint.hector.api.query.QueryResult;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.cassandra.resource.provider.CassandraResourceProvider;
+import org.apache.sling.cassandra.resource.provider.CassandraResourceResolver;
+import org.apache.sling.cassandra.resource.provider.util.CassandraResourceProviderUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Iterator;
+
+public class CassandraDataChildNodeTest{
+    private static Logger LOGGER = LoggerFactory.getLogger(CassandraDataChildNodeTest.class);
+
+    @Test
+    public void testGetChildrenData() {
+        String cf = "p1";
+        try {
+                String r1 = getrowID("/content/cassandra/" + cf + "/c1");
+            String r2 = getrowID("/content/cassandra/" + cf + "/c1/c2");
+            CassandraResourceProvider cassandraResourceProvider = new CassandraResourceProvider();
+            createColumnFamily(cf, cassandraResourceProvider.getKeyspace(), new StringSerializer());
+            cassandraResourceProvider.setColumnFamily(cf);
+
+            addData(cassandraResourceProvider.getKeyspace(), cf, new StringSerializer(),
+                           new String[]{
+                                   "'" + r1 + "','/content/cassandra/" + cf + "/c1','nt:cassandra1','nt:supercass1','resolutionPathInfo=json'",
+                                   "'" + r2 + "','/content/cassandra/" + cf + "/c1/c2','nt:cassandra','nt:supercass2','resolutionPathInfo=json'"
+                           });
+
+            getChildrenData(cassandraResourceProvider, cf);
+       } catch (Exception e) {
+            LOGGER.info("Ignore err" + e.getMessage());
+       }
+    }
+
+    private void getChildrenData(CassandraResourceProvider cassandraResourceProvider, String cf) {
+        Resource resource = cassandraResourceProvider.getResource(new CassandraResourceResolver(), "/content/cassandra/" + cf + "/c1");
+        Assert.assertNotNull(resource);
+        Iterator<Resource> children = resource.listChildren();
+        boolean hasChild = false;
+        while(children.hasNext()){
+            Resource r = children.next();
+            if(r.getPath().equals("/content/cassandra/" + cf + "/c1/c2")) {
+            Assert.assertNotNull(r);
+            Assert.assertEquals("/content/cassandra/" + cf + "/c1/c2",r.getPath());
+            hasChild = true;
+            }
+        }
+        Assert.assertTrue(hasChild);
+    }
+
+    private String getrowID(String path) throws NoSuchAlgorithmException, UnsupportedEncodingException {
+        MessageDigest md = MessageDigest.getInstance("SHA1");
+        String rowID = new String(Base64.encodeBase64(md.digest(CassandraResourceProviderUtil.getRemainingPath(path).getBytes("UTF-8"))));
+        return rowID;
+    }
+
+    private void createColumnFamily(String cf, Keyspace keyspace, StringSerializer se) {
+        String createCF = "CREATE COLUMNFAMILY " + cf + " (KEY varchar PRIMARY KEY,path varchar,resourceType varchar,resourceSuperType varchar,metadata varchar);";
+        CqlQuery<String, String, String> cqlQuery = new CqlQuery<String, String, String>(keyspace, se, se, se);
+        cqlQuery.setQuery(createCF);
+        try{
+        QueryResult<CqlRows<String, String, String>> result1 = cqlQuery.execute();
+        LOGGER.info(result1.get().getList().size() + " Finished.!");
+        }catch(HInvalidRequestException ignore){
+            LOGGER.debug("Column Family already exists "+ignore.getMessage());
+        }
+    }
+
+    private void addData(Keyspace keyspace, String cf, StringSerializer se, String[] dataArray) {
+        for (String data : dataArray) {
+            CqlQuery<String, String, String> cqlQuery = new CqlQuery<String, String, String>(keyspace, se, se, se);
+            String query = "insert into " + cf + " (KEY,path,resourceType,resourceSuperType,metadata) values (" + data + ");";
+            cqlQuery.setQuery(query);
+            cqlQuery.execute();
+        }
+    }
+}
+
diff --git a/src/test/java/org/apache/sling/cassandra/test/data/populator/CassandraDataParentNodeTest.java b/src/test/java/org/apache/sling/cassandra/test/data/populator/CassandraDataParentNodeTest.java
new file mode 100644
index 0000000..7c36005
--- /dev/null
+++ b/src/test/java/org/apache/sling/cassandra/test/data/populator/CassandraDataParentNodeTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.cassandra.test.data.populator;
+
+
+import org.apache.commons.codec.binary.Base64;
+import org.junit.Assert;
+import org.junit.Test;
+import me.prettyprint.cassandra.model.CqlQuery;
+import me.prettyprint.cassandra.model.CqlRows;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.hector.api.Keyspace;
+import me.prettyprint.hector.api.exceptions.HInvalidRequestException;
+import me.prettyprint.hector.api.query.QueryResult;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.cassandra.resource.provider.CassandraResourceProvider;
+import org.apache.sling.cassandra.resource.provider.CassandraResourceResolver;
+import org.apache.sling.cassandra.resource.provider.util.CassandraResourceProviderUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+public class CassandraDataParentNodeTest {
+    private static Logger LOGGER = LoggerFactory.getLogger(CassandraDataParentNodeTest.class);
+
+    @Test
+    public void testGetParentData() {
+        String cf = "p1";
+        try {
+
+           String r1 = getrowID("/content/cassandra/" + cf + "/c1");
+            String r2 = getrowID("/content/cassandra/" + cf + "/c1/c2");
+            CassandraResourceProvider cassandraResourceProvider = new CassandraResourceProvider();
+            createColumnFamily(cf, cassandraResourceProvider.getKeyspace(), new StringSerializer());
+            cassandraResourceProvider.setColumnFamily(cf);
+
+           addData(cassandraResourceProvider.getKeyspace(), cf, new StringSerializer(),
+                           new String[]{
+                                   "'" + r1 + "','/content/cassandra/" + cf + "/c1','nt:cassandra1','nt:supercass1','resolutionPathInfo=json'",
+                                   "'" + r2 + "','/content/cassandra/" + cf + "/c1/c2','nt:cassandra','nt:supercass2','resolutionPathInfo=json'"
+                           });
+
+            getParentData(cassandraResourceProvider, cf);
+       } catch (Exception e) {
+            LOGGER.info("Ignore err" + e.getMessage());
+       }
+
+
+    }
+
+    private void getParentData(CassandraResourceProvider cassandraResourceProvider, String cf) {
+        Resource resource = cassandraResourceProvider.getResource(new CassandraResourceResolver(), "/content/cassandra/" + cf + "/c1/c2");
+        Assert.assertNotNull(resource);
+        Resource parent = resource.getParent();
+        Assert.assertNotNull(parent);
+        Assert.assertEquals("/content/cassandra/" + cf + "/c1",parent.getPath());
+    }
+
+    private String getrowID(String path) throws NoSuchAlgorithmException, UnsupportedEncodingException {
+        MessageDigest md = MessageDigest.getInstance("SHA1");
+        String rowID = new String(Base64.encodeBase64(md.digest(CassandraResourceProviderUtil.getRemainingPath(path).getBytes("UTF-8"))));
+        return rowID;
+    }
+
+    private void createColumnFamily(String cf, Keyspace keyspace, StringSerializer se) {
+        String createCF = "CREATE COLUMNFAMILY " + cf + " (KEY varchar PRIMARY KEY,path varchar,resourceType varchar,resourceSuperType varchar,metadata varchar);";
+        CqlQuery<String, String, String> cqlQuery = new CqlQuery<String, String, String>(keyspace, se, se, se);
+        cqlQuery.setQuery(createCF);
+        try{
+        QueryResult<CqlRows<String, String, String>> result1 = cqlQuery.execute();
+        LOGGER.info(result1.get().getList().size() + " Finished.!");
+        }catch(HInvalidRequestException ignore){
+            LOGGER.debug("Column Family already exists "+ignore.getMessage());
+        }
+    }
+
+    private void addData(Keyspace keyspace, String cf, StringSerializer se, String[] dataArray) {
+        for (String data : dataArray) {
+            CqlQuery<String, String, String> cqlQuery = new CqlQuery<String, String, String>(keyspace, se, se, se);
+            String query = "insert into " + cf + " (KEY,path,resourceType,resourceSuperType,metadata) values (" + data + ");";
+            cqlQuery.setQuery(query);
+            cqlQuery.execute();
+        }
+    }
+}
+
diff --git a/src/test/java/org/apache/sling/cassandra/test/data/populator/CassandraDataReadTest.java b/src/test/java/org/apache/sling/cassandra/test/data/populator/CassandraDataReadTest.java
new file mode 100644
index 0000000..fbdb9d1
--- /dev/null
+++ b/src/test/java/org/apache/sling/cassandra/test/data/populator/CassandraDataReadTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.cassandra.test.data.populator;
+
+
+import org.apache.commons.codec.binary.Base64;
+import org.junit.Assert;
+import org.junit.Test;
+import me.prettyprint.cassandra.model.CqlQuery;
+import me.prettyprint.cassandra.model.CqlRows;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.hector.api.Keyspace;
+import me.prettyprint.hector.api.exceptions.HInvalidRequestException;
+import me.prettyprint.hector.api.query.QueryResult;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.cassandra.resource.provider.CassandraResourceProvider;
+import org.apache.sling.cassandra.resource.provider.CassandraResourceResolver;
+import org.apache.sling.cassandra.resource.provider.util.CassandraResourceProviderUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.UnsupportedEncodingException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+public class CassandraDataReadTest {
+    private static Logger LOGGER = LoggerFactory.getLogger(CassandraDataReadTest.class);
+
+    @Test
+    public void testReadData() {
+        String cf = "p1";
+        try {
+
+            String r1 = getrowID("/content/cassandra/" + cf + "/c1");
+             String r2 = getrowID("/content/cassandra/" + cf + "/c1/c2");
+
+            CassandraResourceProvider cassandraResourceProvider = new CassandraResourceProvider();
+            createColumnFamily(cf, cassandraResourceProvider.getKeyspace(), new StringSerializer());
+            cassandraResourceProvider.setColumnFamily(cf);
+
+        addData(cassandraResourceProvider.getKeyspace(), cf, new StringSerializer(),
+                           new String[]{
+                                   "'" + r1 + "','/content/cassandra/" + cf + "/c1','nt:cassandra1','nt:supercass1','resolutionPathInfo=json'",
+                                   "'" + r2 + "','/content/cassandra/" + cf + "/c1/c2','nt:cassandra','nt:supercass2','resolutionPathInfo=json'"
+                           });
+
+            readData(cassandraResourceProvider, cf);
+       } catch (Exception e) {
+            LOGGER.info("Ignore err" + e.getMessage());
+       }
+
+
+    }
+
+    private void readData(CassandraResourceProvider cassandraResourceProvider, String cf) {
+        Resource resource = cassandraResourceProvider.getResource(new CassandraResourceResolver(), "/content/cassandra/" + cf + "/c1/c2");
+        Assert.assertNotNull(resource);
+    }
+
+
+    private String getrowID(String path) throws NoSuchAlgorithmException, UnsupportedEncodingException {
+        MessageDigest md = MessageDigest.getInstance("SHA1");
+        String rowID = new String(Base64.encodeBase64(md.digest(CassandraResourceProviderUtil.getRemainingPath(path).getBytes("UTF-8"))));
+        return rowID;
+    }
+
+    private void createColumnFamily(String cf, Keyspace keyspace, StringSerializer se) {
+        String createCF = "CREATE COLUMNFAMILY " + cf + " (KEY varchar PRIMARY KEY,path varchar,resourceType varchar,resourceSuperType varchar,metadata varchar);";
+        CqlQuery<String, String, String> cqlQuery = new CqlQuery<String, String, String>(keyspace, se, se, se);
+        cqlQuery.setQuery(createCF);
+        try{
+        QueryResult<CqlRows<String, String, String>> result1 = cqlQuery.execute();
+        LOGGER.info(result1.get().getList().size() + " Finished.!");
+        }catch(HInvalidRequestException ignore){
+            LOGGER.debug("Column Family already exists "+ignore.getMessage());
+        }
+    }
+
+    private void addData(Keyspace keyspace, String cf, StringSerializer se, String[] dataArray) {
+        for (String data : dataArray) {
+            CqlQuery<String, String, String> cqlQuery = new CqlQuery<String, String, String>(keyspace, se, se, se);
+            String query = "insert into " + cf + " (KEY,path,resourceType,resourceSuperType,metadata) values (" + data + ");";
+            cqlQuery.setQuery(query);
+            QueryResult<CqlRows<String, String, String>> result = cqlQuery.execute();
+            LOGGER.info("Added " + result.toString());
+        }
+    }
+}
+
diff --git a/src/test/java/org/apache/sling/cassandra/test/data/populator/CassandraModifyResourceProviderAddTest.java b/src/test/java/org/apache/sling/cassandra/test/data/populator/CassandraModifyResourceProviderAddTest.java
new file mode 100644
index 0000000..799be10
--- /dev/null
+++ b/src/test/java/org/apache/sling/cassandra/test/data/populator/CassandraModifyResourceProviderAddTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.cassandra.test.data.populator;
+
+
+import org.apache.commons.codec.binary.Base64;
+import me.prettyprint.cassandra.model.CqlQuery;
+import me.prettyprint.cassandra.model.CqlRows;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.hector.api.Keyspace;
+import me.prettyprint.hector.api.exceptions.HInvalidRequestException;
+import me.prettyprint.hector.api.query.QueryResult;
+import org.apache.sling.cassandra.resource.provider.CassandraResourceProvider;
+import org.apache.sling.cassandra.resource.provider.CassandraResourceResolver;
+import org.apache.sling.cassandra.resource.provider.util.CassandraResourceProviderUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class CassandraModifyResourceProviderAddTest {
+    private static Logger LOGGER = LoggerFactory.getLogger(CassandraModifyResourceProviderAddTest.class);
+
+    @Test
+    public void testAddData() {
+        String cf = "p2";
+        try {
+            String path1 ="/content/cassandra/" + cf + "/c1";
+
+            CassandraResourceProvider cassandraResourceProvider = new CassandraResourceProvider();
+            createColumnFamily(cf, cassandraResourceProvider.getKeyspace(), new StringSerializer());
+            cassandraResourceProvider.setColumnFamily(cf);
+
+            Map<String,Object> map1 = new HashMap<String, Object>();
+            map1.put("metadata", "resolutionPathInfo=json");
+            map1.put("resourceType", "nt:cassandra0");
+            map1.put("resourceSuperType", "nt:supercass1");
+
+            CassandraResourceResolver resolver =  new CassandraResourceResolver();
+            cassandraResourceProvider.create(resolver,path1,map1);
+            Assert.assertNull("Before Commiting Resource should be null",cassandraResourceProvider.getResource(resolver,path1));
+            cassandraResourceProvider.commit(resolver);
+            Assert.assertNotNull("Commited Resource cannot be null",cassandraResourceProvider.getResource(resolver,path1));
+
+        } catch (Exception e) {
+            LOGGER.info("Ignore err" + e.getMessage());
+            Assert.fail("Failed to add data to cassandra");
+        }
+
+    }
+
+    private void createColumnFamily(String cf, Keyspace keyspace, StringSerializer se) {
+        String createCF = "CREATE COLUMNFAMILY " + cf + " (KEY varchar PRIMARY KEY,path varchar,resourceType varchar,resourceSuperType varchar,metadata varchar);";
+        CqlQuery<String, String, String> cqlQuery = new CqlQuery<String, String, String>(keyspace, se, se, se);
+        cqlQuery.setQuery(createCF);
+        try{
+        QueryResult<CqlRows<String, String, String>> result1 = cqlQuery.execute();
+        LOGGER.info(result1.get().getList().size() + " Finished.!");
+        }catch(HInvalidRequestException ignore){
+            LOGGER.debug("Column Family already exists " + ignore.getMessage());
+        }
+    }
+
+}
+
diff --git a/src/test/java/org/apache/sling/cassandra/test/data/populator/CassandraModifyResourceProviderDeleteTest.java b/src/test/java/org/apache/sling/cassandra/test/data/populator/CassandraModifyResourceProviderDeleteTest.java
new file mode 100644
index 0000000..796fe1f
--- /dev/null
+++ b/src/test/java/org/apache/sling/cassandra/test/data/populator/CassandraModifyResourceProviderDeleteTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.cassandra.test.data.populator;
+
+
+import me.prettyprint.cassandra.model.CqlQuery;
+import me.prettyprint.cassandra.model.CqlRows;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.hector.api.Keyspace;
+import me.prettyprint.hector.api.exceptions.HInvalidRequestException;
+import me.prettyprint.hector.api.query.QueryResult;
+import org.apache.sling.cassandra.resource.provider.CassandraResourceProvider;
+import org.apache.sling.cassandra.resource.provider.CassandraResourceResolver;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class CassandraModifyResourceProviderDeleteTest {
+    private static Logger LOGGER = LoggerFactory.getLogger(CassandraModifyResourceProviderDeleteTest.class);
+
+    @Test
+    public void testDeleteData() {
+        String cf = "p3";
+        try {
+            String path1 ="/content/cassandra/" + cf + "/c1";
+
+            CassandraResourceProvider cassandraResourceProvider = new CassandraResourceProvider();
+            createColumnFamily(cf, cassandraResourceProvider.getKeyspace(), new StringSerializer());
+            cassandraResourceProvider.setColumnFamily(cf);
+
+            Map<String,Object> map1 = new HashMap<String, Object>();
+            map1.put("metadata", "resolutionPathInfo=json");
+            map1.put("resourceType", "nt:cassandra0");
+            map1.put("resourceSuperType", "nt:supercass1");
+
+            CassandraResourceResolver resolver =  new CassandraResourceResolver();
+            cassandraResourceProvider.create(resolver,path1,map1);
+            Assert.assertNull("Before Commiting Resource should be null", cassandraResourceProvider.getResource(resolver, path1));
+            cassandraResourceProvider.commit(resolver);
+            Assert.assertNotNull("Commited Resource cannot be null", cassandraResourceProvider.getResource(resolver, path1));
+
+            cassandraResourceProvider.delete(resolver,path1);
+            Assert.assertNotNull("Uncommited deleted resource cannot be null",cassandraResourceProvider.getResource(resolver,path1));
+            cassandraResourceProvider.commit(resolver);
+            Assert.assertNull("Deleted resource should be null", cassandraResourceProvider.getResource(resolver, path1));
+
+        } catch (Exception e) {
+            LOGGER.info("Ignore err" + e.getMessage());
+            Assert.fail("Failed to add data to cassandra");
+        }
+
+    }
+
+    private void createColumnFamily(String cf, Keyspace keyspace, StringSerializer se) {
+        String createCF = "CREATE COLUMNFAMILY " + cf + " (KEY varchar PRIMARY KEY,path varchar,resourceType varchar,resourceSuperType varchar,metadata varchar);";
+        CqlQuery<String, String, String> cqlQuery = new CqlQuery<String, String, String>(keyspace, se, se, se);
+        cqlQuery.setQuery(createCF);
+        try{
+        QueryResult<CqlRows<String, String, String>> result1 = cqlQuery.execute();
+        LOGGER.info(result1.get().getList().size() + " Finished.!");
+        }catch(HInvalidRequestException ignore){
+            LOGGER.debug("Column Family already exists " + ignore.getMessage());
+        }
+    }
+
+}
+
diff --git a/src/test/java/org/apache/sling/cassandra/test/data/populator/CassandraModifyResourceProviderRevertTest.java b/src/test/java/org/apache/sling/cassandra/test/data/populator/CassandraModifyResourceProviderRevertTest.java
new file mode 100644
index 0000000..fd937e4
--- /dev/null
+++ b/src/test/java/org/apache/sling/cassandra/test/data/populator/CassandraModifyResourceProviderRevertTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.cassandra.test.data.populator;
+
+
+import me.prettyprint.cassandra.model.CqlQuery;
+import me.prettyprint.cassandra.model.CqlRows;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.hector.api.Keyspace;
+import me.prettyprint.hector.api.exceptions.HInvalidRequestException;
+import me.prettyprint.hector.api.query.QueryResult;
+import org.apache.sling.cassandra.resource.provider.CassandraResourceProvider;
+import org.apache.sling.cassandra.resource.provider.CassandraResourceResolver;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class CassandraModifyResourceProviderRevertTest {
+    private static Logger LOGGER = LoggerFactory.getLogger(CassandraModifyResourceProviderRevertTest.class);
+
+    @Test
+    public void testRevertData() {
+        String cf = "p4";
+        try {
+            String path1 ="/content/cassandra/" + cf + "/c1";
+
+            CassandraResourceProvider cassandraResourceProvider = new CassandraResourceProvider();
+            createColumnFamily(cf, cassandraResourceProvider.getKeyspace(), new StringSerializer());
+            cassandraResourceProvider.setColumnFamily(cf);
+
+            Map<String,Object> map1 = new HashMap<String, Object>();
+            map1.put("metadata", "resolutionPathInfo=json");
+            map1.put("resourceType", "nt:cassandra0");
+            map1.put("resourceSuperType", "nt:supercass1");
+
+            CassandraResourceResolver resolver =  new CassandraResourceResolver();
+            cassandraResourceProvider.create(resolver,path1,map1);
+            Assert.assertNull("Before Commiting resource should be null", cassandraResourceProvider.getResource(resolver, path1));
+            cassandraResourceProvider.revert(resolver);
+            Assert.assertNull("Reverted resource should be null", cassandraResourceProvider.getResource(resolver, path1));
+
+        } catch (Exception e) {
+            LOGGER.info("Ignore err" + e.getMessage());
+            Assert.fail("Failed to add data to cassandra");
+        }
+
+    }
+
+    private void createColumnFamily(String cf, Keyspace keyspace, StringSerializer se) {
+        String createCF = "CREATE COLUMNFAMILY " + cf + " (KEY varchar PRIMARY KEY,path varchar,resourceType varchar,resourceSuperType varchar,metadata varchar);";
+        CqlQuery<String, String, String> cqlQuery = new CqlQuery<String, String, String>(keyspace, se, se, se);
+        cqlQuery.setQuery(createCF);
+        try{
+        QueryResult<CqlRows<String, String, String>> result1 = cqlQuery.execute();
+        LOGGER.info(result1.get().getList().size() + " Finished.!");
+        }catch(HInvalidRequestException ignore){
+            LOGGER.debug("Column Family already exists " + ignore.getMessage());
+        }
+    }
+
+}
+
diff --git a/src/test/java/org/apache/sling/cassandra/test/data/populator/CassandraResourceProviderPerformanceTestClient.java b/src/test/java/org/apache/sling/cassandra/test/data/populator/CassandraResourceProviderPerformanceTestClient.java
new file mode 100644
index 0000000..ed1255e
--- /dev/null
+++ b/src/test/java/org/apache/sling/cassandra/test/data/populator/CassandraResourceProviderPerformanceTestClient.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.sling.cassandra.test.data.populator;
+
+import org.apache.sling.commons.testing.integration.HttpTestBase;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test the PlanetsResourceProvider that the test-services bundles provides
+ */
+public class CassandraResourceProviderPerformanceTestClient extends HttpTestBase {
+
+    private static List<String> exerciseList = new ArrayList<String>();
+    private static List<String> testList = new ArrayList<String>();
+    private static List<Long> firstRun = new ArrayList<Long>();
+    private static List<Long> secondRun = new ArrayList<Long>();
+
+    private static File file = new File("/home/dishara/mystuff/sling/svn/slinglatest/launchpad/integration-tests/CassandraLatencyReport.txt");
+                     private static FileWriter fileWriter;
+    static {
+        loadTestLists();
+        if(!file.exists()){
+            try {
+                file.createNewFile();
+            } catch (IOException ignore) {
+            }
+        }
+        try {
+            fileWriter = new FileWriter(file, true);
+        } catch (IOException ignore) {
+        }
+
+    }
+
+    private static void loadTestLists() {
+        int exerciseCount = 0;
+        int testCount = 0;
+
+        for (int i = 0; i < 1000; i++) {
+            if (i % 2 == 0) {
+                if (exerciseCount < 100) {
+                    exerciseList.add(String.valueOf(i));
+                    System.out.println("Exercise: " + i);
+                    exerciseCount++;
+                }
+            } else {
+                if (testCount < 100) {
+                    testList.add(String.valueOf(i));
+                    System.out.println("Test: " + i);
+                    testCount++;
+                }
+            }
+        }
+
+    }
+
+    private void getAllExerciseData(String parentNode) throws IOException {
+        for (String s : exerciseList) {
+            getContent(HTTP_BASE_URL + "/content/cassandra/" + parentNode + "/" + s + ".json", CONTENT_TYPE_JSON);
+        }
+    }
+    private void writeToFile(String s) throws IOException {
+         fileWriter.write(s+"\n");
+    }
+
+    private void getAllTestData(String parentNode, List<Long> result, Map<String, Long> nodeMap) throws IOException {
+        writeToFile("#######################################################################################################################");
+        writeToFile("############################ HTTP Latency Report on Test Data under " + parentNode + " ################################");
+        writeToFile("#######################################################################################################################");
+
+        long subTotal = 0;
+        for (String s : testList) {
+            long startTime = System.currentTimeMillis();
+            String url = HTTP_BASE_URL + "/content/cassandra/" + parentNode + "/" + s + ".json";
+            String content = getContent(url, CONTENT_TYPE_JSON);
+            long latency = System.currentTimeMillis() - startTime;
+            result.add(latency);
+            subTotal = subTotal + latency;
+            writeToFile("[TEST] Latency: " + latency + " (ms) HTTP URI " + content);
+        }
+        nodeMap.put(parentNode,subTotal);
+    }
+
+    public void testMovieResource() throws Exception {
+
+        String[] nodes = new String[]{"A", "B", "C", "D","E","F"};
+        Map<String,Long> firstNodeMap = new HashMap<String,Long>();
+        Map<String,Long> secondNodeMap = new HashMap<String,Long>();
+
+        writeToFile("============================================ FIRST RUN ============================================================");
+        for (String s : nodes) {
+            getAllExerciseData(s);
+            getAllTestData(s,firstRun,firstNodeMap);
+        }
+        writeToFile("============================================ SECOND RUN ============================================================");
+        for (String s : nodes) {
+            getAllExerciseData(s);
+            getAllTestData(s, secondRun, secondNodeMap);
+        }
+
+        long firstRunTotal = 0;
+        long secondRunTotal = 0;
+
+        for(long l1:firstRun){
+            firstRunTotal = firstRunTotal + l1;
+        }
+
+        for(long l2:secondRun){
+            secondRunTotal = secondRunTotal + l2;
+        }
+
+        writeToFile("===========================================================================================================================");
+        writeToFile("========================================== FIRST RUN TEST SUMMERY==========================================================");
+        writeToFile("[RESULT] Average Latency Under Node A(1K)   = " + firstNodeMap.get("A")/testList.size() +" (ms)");
+        writeToFile("[RESULT] Average Latency Under Node B(10K)  = " + firstNodeMap.get("B")/testList.size() +" (ms)");
+        writeToFile("[RESULT] Average Latency Under Node C(100K) = " + firstNodeMap.get("C")/testList.size() +" (ms)");
+        writeToFile("[RESULT] Average Latency Under Node D(1M)   = " + firstNodeMap.get("D")/testList.size() +" (ms)");
+        writeToFile("[RESULT] Average Latency Under Node E(10M)  = " + firstNodeMap.get("E")/testList.size() +" (ms)");
+        writeToFile("[RESULT] Average Latency Under Node F(100M) = " + firstNodeMap.get("F")/testList.size() +" (ms)");
+        writeToFile("[FIRST RUN] #TOTAL CALLS = " + firstRun.size() + " Total Average Latency = " + firstRunTotal/firstRun.size() + " (ms)");
+        writeToFile("===========================================================================================================================");
+        writeToFile("========================================== SECOND RUN TEST SUMMERY==========================================================");
+        writeToFile("[RESULT] Average Latency Under Node A(1K)   = " + secondNodeMap.get("A")/testList.size() +" (ms)");
+        writeToFile("[RESULT] Average Latency Under Node B(10K)  = " + secondNodeMap.get("B")/testList.size() +" (ms)");
+        writeToFile("[RESULT] Average Latency Under Node C(100K) = " + secondNodeMap.get("C")/testList.size() +" (ms)");
+        writeToFile("[RESULT] Average Latency Under Node D(1M)   = " + secondNodeMap.get("D")/testList.size() +" (ms)");
+        writeToFile("[RESULT] Average Latency Under Node E(10M)  = " + secondNodeMap.get("E")/testList.size() +" (ms)");
+        writeToFile("[RESULT] Average Latency Under Node F(100M) = " + secondNodeMap.get("F")/testList.size() +" (ms)");
+        writeToFile("[FIRST RUN] #TOTAL CALLS = " + secondRun.size() + " Total Average Latency = " + secondRunTotal/secondRun.size() + " (ms)");
+        writeToFile("===========================================================================================================================");
+        fileWriter.close();
+    }
+
+
+}
diff --git a/src/test/java/org/apache/sling/cassandra/test/data/populator/perf/CassandraResourceProviderCreateTest.java b/src/test/java/org/apache/sling/cassandra/test/data/populator/perf/CassandraResourceProviderCreateTest.java
new file mode 100644
index 0000000..c1a5465
--- /dev/null
+++ b/src/test/java/org/apache/sling/cassandra/test/data/populator/perf/CassandraResourceProviderCreateTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.sling.cassandra.test.data.populator.perf;
+
+import org.apache.sling.commons.testing.integration.HttpTestBase;
+import org.apache.sling.servlets.post.SlingPostConstants;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test the PlanetsResourceProvider that the test-services bundles provides
+ */
+public class CassandraResourceProviderCreateTest extends HttpTestBase {
+
+    private static List<String> exerciseList = new ArrayList<String>();
+    private static List<String> testList = new ArrayList<String>();
+    private static List<Long> firstRun = new ArrayList<Long>();
+    private static List<Long> secondRun = new ArrayList<Long>();
+
+    private static File file = new File("/home/dishara/mystuff/sling/svn/slinglatest/launchpad/integration-tests/CassandraCreateLatencyReport.txt");
+    private static FileWriter fileWriter;
+
+    static {
+        loadTestLists();
+        if (!file.exists()) {
+            try {
+                file.createNewFile();
+            } catch (IOException ignore) {
+            }
+        }
+        try {
+            fileWriter = new FileWriter(file, true);
+        } catch (IOException ignore) {
+        }
+
+    }
+
+    private static void loadTestLists() {
+        int exerciseCount = 0;
+        int testCount = 0;
+
+        for (int i = 0; i < 1000; i++) {
+            if (i % 2 == 0) {
+                if (exerciseCount < 100) {
+                    exerciseList.add(String.valueOf(i));
+                    System.out.println("Exercise: " + i);
+                    exerciseCount++;
+                }
+            } else {
+                if (testCount < 100) {
+                    testList.add(String.valueOf(i));
+                    System.out.println("Test: " + i);
+                    testCount++;
+                }
+            }
+        }
+
+    }
+
+    private void createAllExerciseData(String parentNode) throws IOException {
+        Map<String, String> props = new HashMap<String, String>();
+        props.put(SlingPostConstants.RP_CONTENT_TYPE, "json");
+        Map<String, String> props1 = new HashMap<String, String>();
+        props1.put("metadata", "resolutionPathInfo=json");
+        props1.put("resourceType", "nt:cassandra0");
+        props1.put("resourceSuperType", "nt:supercass1");
+        props.put(SlingPostConstants.RP_CHECKIN, "true");
+
+        for (String s : exerciseList) {
+            testClient.createNode(HTTP_BASE_URL + "/content/cassandra/" + parentNode + "/" + s, props, props1, true);
+        }
+
+    }
+
+    private void writeToFile(String s) throws IOException {
+        fileWriter.write(s + "\n");
+    }
+
+    private void createAllTestData(String parentNode, List<Long> result, Map<String, Long> nodeMap) throws IOException {
+        Map<String, String> props = new HashMap<String, String>();
+        props.put(SlingPostConstants.RP_CONTENT_TYPE, "json");
+        Map<String, String> props1 = new HashMap<String, String>();
+        props1.put("metadata", "resolutionPathInfo=json");
+        props1.put("resourceType", "nt:cassandra0");
+        props1.put("resourceSuperType", "nt:supercass1");
+        props.put(SlingPostConstants.RP_CHECKIN, "true");
+
+        writeToFile("#######################################################################################################################");
+        writeToFile("############################ HTTP CUD Latency Report on Test Data under " + parentNode + " ################################");
+        writeToFile("#######################################################################################################################");
+
+        long subTotal = 0;
+
+        for (String s : testList) {
+            long startTime = System.currentTimeMillis();
+
+            String url = HTTP_BASE_URL + "/content/cassandra/" + parentNode + "/" + s;
+            testClient.createNode(url, props, props1, true);
+            long latency = System.currentTimeMillis() - startTime;
+            result.add(latency);
+            subTotal = subTotal + latency;
+            writeToFile("[TEST] Latency: " + latency + " (ms) HTTP URI " + url);
+            System.out.println(url);
+        }
+        nodeMap.put(parentNode, subTotal);
+    }
+
+    public void testMovieResource() throws Exception {
+        String[] nodes = new String[]{"LA", "MA", "SA"};
+//        String[] nodes = new String[]{"A", "B", "C", "D", "E", "F"};
+        Map<String, Long> firstNodeMap = new HashMap<String, Long>();
+        Map<String, Long> secondNodeMap = new HashMap<String, Long>();
+
+        writeToFile("============================================ FIRST RUN ============================================================");
+        for (String s : nodes) {
+            createAllTestData(s, firstRun, firstNodeMap);
+        }
+        long firstRunTotal = 0;
+        long secondRunTotal = 0;
+
+        for (long l1 : firstRun) {
+            firstRunTotal = firstRunTotal + l1;
+        }
+
+        for (long l2 : secondRun) {
+            secondRunTotal = secondRunTotal + l2;
+        }
+
+        writeToFile("===========================================================================================================================");
+        writeToFile("========================================== FIRST RUN TEST SUMMERY==========================================================");
+        writeToFile("[RESULT] Average Latency Under Node A(1K)   = " + firstNodeMap.get("LA") / testList.size() + " (ms)");
+        writeToFile("[RESULT] Average Latency Under Node B(10K)  = " + firstNodeMap.get("MA") / testList.size() + " (ms)");
+        writeToFile("[RESULT] Average Latency Under Node C(100K) = " + firstNodeMap.get("SA") / testList.size() + " (ms)");
+//        writeToFile("[RESULT] Average Latency Under Node D(1M)   = " + firstNodeMap.get("D") / testList.size() + " (ms)");
+//        writeToFile("[RESULT] Average Latency Under Node E(10M)  = " + firstNodeMap.get("E") / testList.size() + " (ms)");
+//        writeToFile("[RESULT] Average Latency Under Node F(100M) = " + firstNodeMap.get("F") / testList.size() + " (ms)");
+        writeToFile("[FIRST RUN] #TOTAL CALLS = " + firstRun.size() + " Total Average Latency = " + firstRunTotal / firstRun.size() + " (ms)");
+        writeToFile("===========================================================================================================================");
+//        writeToFile("========================================== SECOND RUN TEST SUMMERY==========================================================");
+//        writeToFile("[RESULT] Average Latency Under Node A(1K)   = " + secondNodeMap.get("LA") / testList.size() + " (ms)");
+//        writeToFile("[RESULT] Average Latency Under Node B(10K)  = " + secondNodeMap.get("MA") / testList.size() + " (ms)");
+//        writeToFile("[RESULT] Average Latency Under Node C(100K) = " + secondNodeMap.get("SA") / testList.size() + " (ms)");
+//        writeToFile("[RESULT] Average Latency Under Node D(1M)   = " + secondNodeMap.get("D") / testList.size() + " (ms)");
+//        writeToFile("[RESULT] Average Latency Under Node E(10M)  = " + secondNodeMap.get("E") / testList.size() + " (ms)");
+//        writeToFile("[RESULT] Average Latency Under Node F(100M) = " + secondNodeMap.get("F") / testList.size() + " (ms)");
+//        writeToFile("[FIRST RUN] #TOTAL CALLS = " + secondRun.size() + " Total Average Latency = " + secondRunTotal / secondRun.size() + " (ms)");
+        writeToFile("===========================================================================================================================");
+        fileWriter.close();
+    }
+
+
+}
diff --git a/src/test/java/org/apache/sling/cassandra/test/data/populator/perf/CassandraResourceProviderDeleteTest.java b/src/test/java/org/apache/sling/cassandra/test/data/populator/perf/CassandraResourceProviderDeleteTest.java
new file mode 100644
index 0000000..df90af6
--- /dev/null
+++ b/src/test/java/org/apache/sling/cassandra/test/data/populator/perf/CassandraResourceProviderDeleteTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.sling.cassandra.test.data.populator.perf;
+
+import org.apache.sling.commons.testing.integration.HttpTestBase;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test the PlanetsResourceProvider that the test-services bundles provides
+ */
+public class CassandraResourceProviderDeleteTest extends HttpTestBase {
+
+    private static List<String> exerciseList = new ArrayList<String>();
+    private static List<String> testList = new ArrayList<String>();
+    private static List<Long> firstRun = new ArrayList<Long>();
+    private static List<Long> secondRun = new ArrayList<Long>();
+
+    private static File file = new File("/home/dishara/mystuff/sling/svn/slinglatest/launchpad/integration-tests/CassandraDeleteLatencyReport.txt");
+    private static FileWriter fileWriter;
+
+    static {
+        loadTestLists();
+        if (!file.exists()) {
+            try {
+                file.createNewFile();
+            } catch (IOException ignore) {
+            }
+        }
+        try {
+            fileWriter = new FileWriter(file, true);
+        } catch (IOException ignore) {
+        }
+
+    }
+
+    private static void loadTestLists() {
+        int exerciseCount = 0;
+        int testCount = 0;
+
+        for (int i = 0; i < 1000; i++) {
+            if (i % 2 == 0) {
+                if (exerciseCount < 100) {
+                    exerciseList.add(String.valueOf(i));
+                    System.out.println("Exercise: " + i);
+                    exerciseCount++;
+                }
+            } else {
+                if (testCount < 100) {
+                    testList.add(String.valueOf(i));
+                    System.out.println("Test: " + i);
+                    testCount++;
+                }
+            }
+        }
+
+    }
+
+    private void writeToFile(String s) throws IOException {
+        fileWriter.write(s + "\n");
+    }
+
+    private void deleteAllTestData(String parentNode, List<Long> result, Map<String, Long> nodeMap) throws IOException {
+        writeToFile("#######################################################################################################################");
+        writeToFile("############################ HTTP CUD Latency Report on Test Data under " + parentNode + " ################################");
+        writeToFile("#######################################################################################################################");
+
+        long subTotal = 0;
+
+        for (String s : testList) {
+            long startTime = System.currentTimeMillis();
+
+            String url = HTTP_BASE_URL + "/content/cassandra/" + parentNode + "/" + s;
+            testClient.delete(url);
+            long latency = System.currentTimeMillis() - startTime;
+            result.add(latency);
+            subTotal = subTotal + latency;
+            writeToFile("[TEST] Latency: " + latency + " (ms) HTTP URI " + url);
+            System.out.println(url);
+        }
+        nodeMap.put(parentNode, subTotal);
+    }
+
+    public void testMovieResource() throws Exception {
+        String[] nodes = new String[]{"LA", "MA", "SA"};
+        Map<String, Long> firstNodeMap = new HashMap<String, Long>();
+
+        writeToFile("============================================ FIRST RUN ============================================================");
+        for (String s : nodes) {
+            deleteAllTestData(s, firstRun, firstNodeMap);
+        }
+        long firstRunTotal = 0;
+        long secondRunTotal = 0;
+
+        for (long l1 : firstRun) {
+            firstRunTotal = firstRunTotal + l1;
+        }
+
+        for (long l2 : secondRun) {
+            secondRunTotal = secondRunTotal + l2;
+        }
+
+        writeToFile("===========================================================================================================================");
+        writeToFile("========================================== FIRST RUN TEST SUMMERY==========================================================");
+        writeToFile("[RESULT] Average Latency Under Node A(1K)   = " + firstNodeMap.get("LA") / testList.size() + " (ms)");
+        writeToFile("[RESULT] Average Latency Under Node B(10K)  = " + firstNodeMap.get("MA") / testList.size() + " (ms)");
+        writeToFile("[RESULT] Average Latency Under Node C(100K) = " + firstNodeMap.get("SA") / testList.size() + " (ms)");
+        writeToFile("[FIRST RUN] #TOTAL CALLS = " + firstRun.size() + " Total Average Latency = " + firstRunTotal / firstRun.size() + " (ms)");
+        writeToFile("===========================================================================================================================");
+        writeToFile("===========================================================================================================================");
+        fileWriter.close();
+    }
+
+
+}
diff --git a/src/test/java/org/apache/sling/cassandra/test/data/populator/perf/CassandraResourceProviderUpdateTest.java b/src/test/java/org/apache/sling/cassandra/test/data/populator/perf/CassandraResourceProviderUpdateTest.java
new file mode 100644
index 0000000..2ffb7cf
--- /dev/null
+++ b/src/test/java/org/apache/sling/cassandra/test/data/populator/perf/CassandraResourceProviderUpdateTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.sling.cassandra.test.data.populator.perf;
+
+import org.apache.sling.commons.testing.integration.HttpTestBase;
+import org.apache.sling.servlets.post.SlingPostConstants;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test the PlanetsResourceProvider that the test-services bundles provides
+ */
+public class CassandraResourceProviderUpdateTest extends HttpTestBase {
+
+    private static List<String> exerciseList = new ArrayList<String>();
+    private static List<String> testList = new ArrayList<String>();
+    private static List<Long> firstRun = new ArrayList<Long>();
+    private static List<Long> secondRun = new ArrayList<Long>();
+
+    private static File file = new File("/home/dishara/mystuff/sling/svn/slinglatest/launchpad/integration-tests/CassandraUpdateLatencyReport.txt");
+    private static FileWriter fileWriter;
+
+    static {
+        loadTestLists();
+        if (!file.exists()) {
+            try {
+                file.createNewFile();
+            } catch (IOException ignore) {
+            }
+        }
+        try {
+            fileWriter = new FileWriter(file, true);
+        } catch (IOException ignore) {
+        }
+
+    }
+
+    private static void loadTestLists() {
+        int exerciseCount = 0;
+        int testCount = 0;
+
+        for (int i = 0; i < 1000; i++) {
+            if (i % 2 == 0) {
+                if (exerciseCount < 100) {
+                    exerciseList.add(String.valueOf(i));
+                    System.out.println("Exercise: " + i);
+                    exerciseCount++;
+                }
+            } else {
+                if (testCount < 100) {
+                    testList.add(String.valueOf(i));
+                    System.out.println("Test: " + i);
+                    testCount++;
+                }
+            }
+        }
+
+    }
+
+    private void createAllExerciseData(String parentNode) throws IOException {
+        Map<String, String> props = new HashMap<String, String>();
+        props.put(SlingPostConstants.RP_CONTENT_TYPE, "json");
+        Map<String, String> props1 = new HashMap<String, String>();
+        props1.put("metadata", "resolutionPathInfo=json");
+        props1.put("resourceType", "nt:cassandra9");
+        props1.put("resourceSuperType", "nt:supercass9");
+        props.put(SlingPostConstants.RP_CHECKIN, "true");
+
+        for (String s : exerciseList) {
+            testClient.createNode(HTTP_BASE_URL + "/content/cassandra/" + parentNode + "/" + s, props, props1, true);
+        }
+
+    }
+
+    private void writeToFile(String s) throws IOException {
+        fileWriter.write(s + "\n");
+    }
+
+    private void createAllTestData(String parentNode, List<Long> result, Map<String, Long> nodeMap) throws IOException {
+        Map<String, String> props = new HashMap<String, String>();
+        props.put(SlingPostConstants.RP_CONTENT_TYPE, "json");
+        Map<String, String> props1 = new HashMap<String, String>();
+        props1.put("metadata", "resolutionPathInfo=json");
+        props1.put("resourceType", "nt:cassandra9");
+        props1.put("resourceSuperType", "nt:supercass9");
+        props.put(SlingPostConstants.RP_CHECKIN, "true");
+
+        writeToFile("#######################################################################################################################");
+        writeToFile("############################ HTTP CUD Latency Report on Test Data under " + parentNode + " ################################");
+        writeToFile("#######################################################################################################################");
+
+        long subTotal = 0;
+
+        for (String s : testList) {
+            long startTime = System.currentTimeMillis();
+
+            String url = HTTP_BASE_URL + "/content/cassandra/" + parentNode + "/" + s;
+            testClient.createNode(url, props, props1, true);
+            long latency = System.currentTimeMillis() - startTime;
+            result.add(latency);
+            subTotal = subTotal + latency;
+            writeToFile("[TEST] Latency: " + latency + " (ms) HTTP URI " + url);
+            System.out.println(url);
+        }
+        nodeMap.put(parentNode, subTotal);
+    }
+
+    public void testMovieResource() throws Exception {
+        String[] nodes = new String[]{"LA", "MA", "SA"};
+        Map<String, Long> firstNodeMap = new HashMap<String, Long>();
+
+        writeToFile("============================================ FIRST RUN ============================================================");
+        for (String s : nodes) {
+            createAllTestData(s, firstRun, firstNodeMap);
+        }
+        long firstRunTotal = 0;
+        long secondRunTotal = 0;
+
+        for (long l1 : firstRun) {
+            firstRunTotal = firstRunTotal + l1;
+        }
+
+        for (long l2 : secondRun) {
+            secondRunTotal = secondRunTotal + l2;
+        }
+
+        writeToFile("===========================================================================================================================");
+        writeToFile("========================================== FIRST RUN TEST SUMMERY==========================================================");
+        writeToFile("[RESULT] Average Latency Under Node A(1K)   = " + firstNodeMap.get("LA") / testList.size() + " (ms)");
+        writeToFile("[RESULT] Average Latency Under Node B(10K)  = " + firstNodeMap.get("MA") / testList.size() + " (ms)");
+        writeToFile("[RESULT] Average Latency Under Node C(100K) = " + firstNodeMap.get("SA") / testList.size() + " (ms)");
+//        writeToFile("[RESULT] Average Latency Under Node D(1M)   = " + firstNodeMap.get("D") / testList.size() + " (ms)");
+//        writeToFile("[RESULT] Average Latency Under Node E(10M)  = " + firstNodeMap.get("E") / testList.size() + " (ms)");
+//        writeToFile("[RESULT] Average Latency Under Node F(100M) = " + firstNodeMap.get("F") / testList.size() + " (ms)");
+        writeToFile("[FIRST RUN] #TOTAL CALLS = " + firstRun.size() + " Total Average Latency = " + firstRunTotal / firstRun.size() + " (ms)");
+        writeToFile("===========================================================================================================================");
+//        writeToFile("========================================== SECOND RUN TEST SUMMERY==========================================================");
+//        writeToFile("[RESULT] Average Latency Under Node A(1K)   = " + secondNodeMap.get("LA") / testList.size() + " (ms)");
+//        writeToFile("[RESULT] Average Latency Under Node B(10K)  = " + secondNodeMap.get("MA") / testList.size() + " (ms)");
+//        writeToFile("[RESULT] Average Latency Under Node C(100K) = " + secondNodeMap.get("SA") / testList.size() + " (ms)");
+//        writeToFile("[RESULT] Average Latency Under Node D(1M)   = " + secondNodeMap.get("D") / testList.size() + " (ms)");
+//        writeToFile("[RESULT] Average Latency Under Node E(10M)  = " + secondNodeMap.get("E") / testList.size() + " (ms)");
+//        writeToFile("[RESULT] Average Latency Under Node F(100M) = " + secondNodeMap.get("F") / testList.size() + " (ms)");
+//        writeToFile("[FIRST RUN] #TOTAL CALLS = " + secondRun.size() + " Total Average Latency = " + secondRunTotal / secondRun.size() + " (ms)");
+        writeToFile("===========================================================================================================================");
+        fileWriter.close();
+    }
+
+
+}

-- 
To stop receiving notification emails like this one, please contact
"commits@sling.apache.org" <co...@sling.apache.org>.