You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:57 UTC

[11/42] Refactoring the package names and removing jsql parser

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/josql/ClusterJosqlQueryProcessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/josql/ClusterJosqlQueryProcessor.java b/helix-core/src/main/java/org/apache/helix/josql/ClusterJosqlQueryProcessor.java
new file mode 100644
index 0000000..9a5f36d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/josql/ClusterJosqlQueryProcessor.java
@@ -0,0 +1,304 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.josql;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ConfigScope.ConfigScopeProperty;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
+import org.apache.log4j.Logger;
+import org.josql.Query;
+import org.josql.QueryExecutionException;
+import org.josql.QueryParseException;
+import org.josql.QueryResults;
+
+
+public class ClusterJosqlQueryProcessor
+{
+  public static final String PARTITIONS = "PARTITIONS";
+  public static final String FLATTABLE = ".Table";
+
+  HelixManager _manager;
+  private static Logger _logger = Logger.getLogger(ClusterJosqlQueryProcessor.class);
+
+  public ClusterJosqlQueryProcessor(HelixManager manager)
+  {
+    _manager = manager;
+  }
+
+  String parseFromTarget(String sql)
+  {
+    // We need to find out the "FROM" target, and replace it with liveInstances
+    // / partitions etc
+    int fromIndex = sql.indexOf("FROM");
+    if (fromIndex == -1)
+    {
+      throw new HelixException("Query must contain FROM target. Query: " + sql);
+    }
+    // Per JoSql, select FROM <target> the target must be a object class that
+    // corresponds to a "table row"
+    // In out case, the row is always a ZNRecord
+
+    int nextSpace = sql.indexOf(" ", fromIndex);
+    while (sql.charAt(nextSpace) == ' ')
+    {
+      nextSpace++;
+    }
+    int nextnextSpace = sql.indexOf(" ", nextSpace);
+    if (nextnextSpace == -1)
+    {
+      nextnextSpace = sql.length();
+    }
+    String fromTarget = sql.substring(nextSpace, nextnextSpace).trim();
+
+    if (fromTarget.length() == 0)
+    {
+      throw new HelixException("FROM target in the query cannot be empty. Query: " + sql);
+    }
+    return fromTarget;
+  }
+
+  public List<Object> runJoSqlQuery(String josql, Map<String, Object> bindVariables,
+      List<Object> additionalFunctionHandlers, List queryTarget) throws QueryParseException,
+      QueryExecutionException
+  {
+    Query josqlQuery = prepareQuery(bindVariables, additionalFunctionHandlers);
+
+    josqlQuery.parse(josql);
+    QueryResults qr = josqlQuery.execute(queryTarget);
+
+    return qr.getResults();
+  }
+
+  Query prepareQuery(Map<String, Object> bindVariables, List<Object> additionalFunctionHandlers)
+  {
+    // DataAccessor accessor = _manager.getDataAccessor();
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+    
+    // Get all the ZNRecords in the cluster and set them as bind variables
+    Builder keyBuilder = accessor.keyBuilder();
+//    List<ZNRecord> instanceConfigs = accessor.getChildValues(PropertyType.CONFIGS,
+//        ConfigScopeProperty.PARTICIPANT.toString());
+    
+    List<ZNRecord> instanceConfigs = HelixProperty.convertToList(accessor.getChildValues(keyBuilder.instanceConfigs()));
+
+    List<ZNRecord> liveInstances = HelixProperty.convertToList(accessor.getChildValues(keyBuilder.liveInstances()));
+    List<ZNRecord> stateModelDefs = HelixProperty.convertToList(accessor.getChildValues(keyBuilder.stateModelDefs()));
+    
+    // Idealstates are stored in a map from resource name to idealState ZNRecord
+    List<ZNRecord> idealStateList = HelixProperty.convertToList(accessor.getChildValues(keyBuilder.idealStates()));
+    
+    Map<String, ZNRecord> idealStatesMap = new HashMap<String, ZNRecord>();
+    for (ZNRecord idealState : idealStateList)
+    {
+      idealStatesMap.put(idealState.getId(), idealState);
+    }
+    // Make up the partition list: for selecting partitions
+    List<ZNRecord> partitions = new ArrayList<ZNRecord>();
+    for (ZNRecord idealState : idealStateList)
+    {
+      for (String partitionName : idealState.getMapFields().keySet())
+      {
+        partitions.add(new ZNRecord(partitionName));
+      }
+    }
+    
+    List<ZNRecord> externalViewList = HelixProperty.convertToList(accessor.getChildValues(keyBuilder.externalViews()));
+    // ExternalViews are stored in a map from resource name to idealState
+    // ZNRecord
+    Map<String, ZNRecord> externalViewMap = new HashMap<String, ZNRecord>();
+    for (ZNRecord externalView : externalViewList)
+    {
+      externalViewMap.put(externalView.getId(), externalView);
+    }
+    // Map from instance name to a map from resource to current state ZNRecord
+    Map<String, Map<String, ZNRecord>> currentStatesMap = new HashMap<String, Map<String, ZNRecord>>();
+    // Map from instance name to a list of combined flat ZNRecordRow
+    Map<String, List<ZNRecordRow>> flatCurrentStateMap = new HashMap<String, List<ZNRecordRow>>();
+
+    for (ZNRecord instance : liveInstances)
+    {
+      String host = instance.getId();
+      String sessionId = instance.getSimpleField(LiveInstanceProperty.SESSION_ID.toString());
+      Map<String, ZNRecord> currentStates = new HashMap<String, ZNRecord>();
+      List<ZNRecord> instanceCurrentStateList = new ArrayList<ZNRecord>();
+      for (ZNRecord idealState : idealStateList)
+      {
+        String resourceName = idealState.getId();
+        
+        HelixProperty property = accessor.getProperty(keyBuilder.currentState(host, sessionId, resourceName));
+        ZNRecord currentState =null;
+        if (property == null)
+        {
+          _logger.warn("Resource " + resourceName + " has null currentState");
+          currentState = new ZNRecord(resourceName);
+        }else{
+          currentState = property.getRecord();
+        }
+        currentStates.put(resourceName, currentState);
+        instanceCurrentStateList.add(currentState);
+      }
+      currentStatesMap.put(host, currentStates);
+      flatCurrentStateMap.put(host, ZNRecordRow.flatten(instanceCurrentStateList));
+    }
+    Query josqlQuery = new Query();
+
+    // Set the default bind variables
+    josqlQuery
+.setVariable(
+        PropertyType.CONFIGS.toString() + "/" + ConfigScopeProperty.PARTICIPANT.toString(),
+            instanceConfigs);
+    josqlQuery.setVariable(PropertyType.IDEALSTATES.toString(), idealStatesMap);
+    josqlQuery.setVariable(PropertyType.LIVEINSTANCES.toString(), liveInstances);
+    josqlQuery.setVariable(PropertyType.STATEMODELDEFS.toString(), stateModelDefs);
+    josqlQuery.setVariable(PropertyType.EXTERNALVIEW.toString(), externalViewMap);
+    josqlQuery.setVariable(PropertyType.CURRENTSTATES.toString(), currentStatesMap);
+    josqlQuery.setVariable(PARTITIONS, partitions);
+
+    // Flat version of ZNRecords
+    josqlQuery.setVariable(
+        PropertyType.CONFIGS.toString() + "/" + ConfigScopeProperty.PARTICIPANT.toString()
+            + FLATTABLE,
+        ZNRecordRow.flatten(instanceConfigs));
+    josqlQuery.setVariable(PropertyType.IDEALSTATES.toString() + FLATTABLE,
+        ZNRecordRow.flatten(idealStateList));
+    josqlQuery.setVariable(PropertyType.LIVEINSTANCES.toString() + FLATTABLE,
+        ZNRecordRow.flatten(liveInstances));
+    josqlQuery.setVariable(PropertyType.STATEMODELDEFS.toString() + FLATTABLE,
+        ZNRecordRow.flatten(stateModelDefs));
+    josqlQuery.setVariable(PropertyType.EXTERNALVIEW.toString() + FLATTABLE,
+        ZNRecordRow.flatten(externalViewList));
+    josqlQuery.setVariable(PropertyType.CURRENTSTATES.toString() + FLATTABLE,
+        flatCurrentStateMap.values());
+    josqlQuery.setVariable(PARTITIONS + FLATTABLE, ZNRecordRow.flatten(partitions));
+    // Set additional bind variables
+    if (bindVariables != null)
+    {
+      for (String key : bindVariables.keySet())
+      {
+        josqlQuery.setVariable(key, bindVariables.get(key));
+      }
+    }
+
+    josqlQuery.addFunctionHandler(new ZNRecordJosqlFunctionHandler());
+    josqlQuery.addFunctionHandler(new ZNRecordRow());
+    josqlQuery.addFunctionHandler(new Integer(0));
+    if (additionalFunctionHandlers != null)
+    {
+      for (Object functionHandler : additionalFunctionHandlers)
+      {
+        josqlQuery.addFunctionHandler(functionHandler);
+      }
+    }
+    return josqlQuery;
+  }
+
+  public List<Object> runJoSqlQuery(String josql, Map<String, Object> bindVariables,
+      List<Object> additionalFunctionHandlers) throws QueryParseException, QueryExecutionException
+  {
+    Query josqlQuery = prepareQuery(bindVariables, additionalFunctionHandlers);
+
+    // Per JoSql, select FROM <target> the target must be a object class that
+    // corresponds to a "table row",
+    // while the table (list of Objects) are put in the query by
+    // query.execute(List<Object>). In the input,
+    // In out case, the row is always a ZNRecord. But in SQL, the from target is
+    // a "table name".
+
+    String fromTargetString = parseFromTarget(josql);
+
+    List fromTargetList = null;
+    Object fromTarget = null;
+    if (fromTargetString.equalsIgnoreCase(PARTITIONS))
+    {
+      fromTarget = josqlQuery.getVariable(PARTITIONS.toString());
+    } else if (fromTargetString.equalsIgnoreCase(PropertyType.LIVEINSTANCES.toString()))
+    {
+      fromTarget = josqlQuery.getVariable(PropertyType.LIVEINSTANCES.toString());
+    } else if (fromTargetString.equalsIgnoreCase(PropertyType.CONFIGS.toString() + "/"
+        + ConfigScopeProperty.PARTICIPANT.toString()))
+    {
+      fromTarget = josqlQuery.getVariable(PropertyType.CONFIGS.toString() + "/"
+          + ConfigScopeProperty.PARTICIPANT.toString());
+    } else if (fromTargetString.equalsIgnoreCase(PropertyType.STATEMODELDEFS.toString()))
+    {
+      fromTarget = josqlQuery.getVariable(PropertyType.STATEMODELDEFS.toString());
+    } else if (fromTargetString.equalsIgnoreCase(PropertyType.EXTERNALVIEW.toString()))
+    {
+      fromTarget = josqlQuery.getVariable(PropertyType.EXTERNALVIEW.toString());
+    } 
+    else if (fromTargetString.equalsIgnoreCase(PropertyType.IDEALSTATES.toString()))
+    {
+      fromTarget = josqlQuery.getVariable(PropertyType.IDEALSTATES.toString());
+    }
+    
+    else if (fromTargetString.equalsIgnoreCase(PARTITIONS + FLATTABLE))
+    {
+      fromTarget = josqlQuery.getVariable(PARTITIONS.toString() + FLATTABLE);
+    } else if (fromTargetString.equalsIgnoreCase(PropertyType.LIVEINSTANCES.toString() + FLATTABLE))
+    {
+      fromTarget = josqlQuery.getVariable(PropertyType.LIVEINSTANCES.toString() + FLATTABLE);
+    } else if (fromTargetString.equalsIgnoreCase(PropertyType.CONFIGS.toString() + "/"
+        + ConfigScopeProperty.PARTICIPANT.toString()
+        + FLATTABLE))
+    {
+      fromTarget = josqlQuery.getVariable(PropertyType.CONFIGS.toString() + "/"
+          + ConfigScopeProperty.PARTICIPANT.toString() + FLATTABLE);
+    } else if (fromTargetString
+        .equalsIgnoreCase(PropertyType.STATEMODELDEFS.toString() + FLATTABLE))
+    {
+      fromTarget = josqlQuery.getVariable(PropertyType.STATEMODELDEFS.toString() + FLATTABLE);
+    } else if (fromTargetString.equalsIgnoreCase(PropertyType.EXTERNALVIEW.toString() + FLATTABLE))
+    {
+      fromTarget = josqlQuery.getVariable(PropertyType.EXTERNALVIEW.toString() + FLATTABLE);
+    }
+    else if (fromTargetString.equalsIgnoreCase(PropertyType.IDEALSTATES.toString() + FLATTABLE))
+    {
+      fromTarget = josqlQuery.getVariable(PropertyType.IDEALSTATES.toString() + FLATTABLE);
+    }
+    else
+    {
+      throw new HelixException(
+          "Unknown query target "
+              + fromTargetString
+              + ". Target should be PARTITIONS, LIVEINSTANCES, CONFIGS, STATEMODELDEFS, IDEALSTATES, EXTERNALVIEW, and corresponding flat Tables");
+    }
+
+    fromTargetList = fromTargetString.endsWith(FLATTABLE) ? ((List<ZNRecordRow>) fromTarget)
+        : ((List<ZNRecord>) fromTarget);
+
+    // Per JoSql, select FROM <target> the target must be a object class that
+    // corresponds to a "table row"
+    // In out case, the row is always a ZNRecord
+    josql = josql.replaceFirst(
+        fromTargetString,
+        fromTargetString.endsWith(FLATTABLE) ? ZNRecordRow.class.getName() : ZNRecord.class
+            .getName());
+    josqlQuery.parse(josql);
+    QueryResults qr = josqlQuery.execute(fromTargetList);
+    return qr.getResults();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/josql/ZNRecordJosqlFunctionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/josql/ZNRecordJosqlFunctionHandler.java b/helix-core/src/main/java/org/apache/helix/josql/ZNRecordJosqlFunctionHandler.java
new file mode 100644
index 0000000..34c5c75
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/josql/ZNRecordJosqlFunctionHandler.java
@@ -0,0 +1,90 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.josql;
+
+import java.util.Map;
+
+import org.apache.helix.ZNRecord;
+import org.josql.functions.AbstractFunctionHandler;
+
+
+
+public class ZNRecordJosqlFunctionHandler extends AbstractFunctionHandler
+{
+  public boolean hasSimpleField(ZNRecord record, String fieldName, String field)
+  {
+    if(!record.getSimpleFields().containsKey(fieldName))
+    {
+      return false;
+    }
+    return field.equals(record.getSimpleField(fieldName));
+  }
+  
+  public boolean hasListField(ZNRecord record, String fieldName, String field)
+  {
+    if(!record.getListFields().containsKey(fieldName))
+    {
+      return false;
+    }
+    return record.getListField(fieldName).contains(field);
+  }
+  
+  public boolean hasMapFieldValue(ZNRecord record, String fieldName, String mapKey, String mapValue)
+  {
+    if(!record.getMapFields().containsKey(fieldName))
+    {
+      return false;
+    }
+    if(record.getMapField(fieldName).containsKey(mapKey))
+    {
+      return record.getMapField(fieldName).get(mapKey).equals(mapValue);
+    }
+    return false;
+  }
+  
+  public boolean hasMapFieldKey(ZNRecord record, String fieldName, String mapKey)
+  {
+    if(!record.getMapFields().containsKey(fieldName))
+    {
+      return false;
+    }
+    return record.getMapField(fieldName).containsKey(mapKey);
+  }
+  
+  public String getMapFieldValue(ZNRecord record, String fieldName, String mapKey)
+  {
+    if(record.getMapFields().containsKey(fieldName))
+    {
+      return record.getMapField(fieldName).get(mapKey);
+    }
+    return null;
+  }
+  
+  public String getSimpleFieldValue(ZNRecord record, String key)
+  {
+    return record.getSimpleField(key);
+  }
+  
+  public ZNRecord getZNRecordFromMap(Map<String, ZNRecord> recordMap, String key)
+  {
+    return recordMap.get(key);
+  }
+  
+  public ZNRecord getZNRecordFromMap(Map<String, Map<String, ZNRecord>> recordMap, String key, String subKey)
+  {
+    return recordMap.get(key).get(subKey);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/josql/ZNRecordRow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/josql/ZNRecordRow.java b/helix-core/src/main/java/org/apache/helix/josql/ZNRecordRow.java
new file mode 100644
index 0000000..462b690
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/josql/ZNRecordRow.java
@@ -0,0 +1,195 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.josql;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ZNRecord;
+
+
+/**
+ * A Normalized form of ZNRecord 
+ * */
+public class ZNRecordRow
+{
+  // "Field names" in the flattened ZNRecord
+  public static final String SIMPLE_KEY = "simpleKey";
+  public static final String SIMPLE_VALUE = "simpleValue";
+  
+  public static final String LIST_KEY = "listKey";
+  public static final String LIST_VALUE = "listValue";
+  public static final String LIST_VALUE_INDEX = "listValueIndex";
+  
+  public static final String MAP_KEY = "mapKey";
+  public static final String MAP_SUBKEY = "mapSubKey";
+  public static final String MAP_VALUE = "mapValue";
+  public static final String ZNRECORD_ID = "recordId";
+  // ZNRECORD path ? 
+  
+  
+  final Map<String, String> _rowDataMap = new HashMap<String, String>();
+  
+  public ZNRecordRow()
+  {
+    _rowDataMap.put(SIMPLE_KEY, "");
+    _rowDataMap.put(SIMPLE_VALUE, "");
+    _rowDataMap.put(LIST_KEY, "");
+    _rowDataMap.put(LIST_VALUE, "");
+    _rowDataMap.put(LIST_VALUE_INDEX, "");
+    _rowDataMap.put(MAP_KEY, "");
+    _rowDataMap.put(MAP_SUBKEY, "");
+    _rowDataMap.put(MAP_VALUE, "");
+    _rowDataMap.put(ZNRECORD_ID, "");
+  }
+  
+  public String getField(String rowField)
+  {
+    return _rowDataMap.get(rowField);
+  }
+  
+  public void putField(String fieldName, String fieldValue)
+  {
+    _rowDataMap.put(fieldName, fieldValue);
+  }
+  public String getListValueIndex()
+  {
+    return getField(LIST_VALUE_INDEX);
+  }
+  public String getSimpleKey()
+  {
+    return getField(SIMPLE_KEY);
+  }
+  
+  public String getSimpleValue()
+  {
+    return getField(SIMPLE_VALUE);
+  }
+  
+  public String getListKey()
+  {
+    return getField(LIST_KEY);
+  }
+  
+  public String getListValue()
+  {
+    return getField(LIST_VALUE);
+  }
+  
+  public String getMapKey()
+  {
+    return getField(MAP_KEY);
+  }
+  
+  public String getMapSubKey()
+  {
+    return getField(MAP_SUBKEY);
+  }
+  
+  public String getMapValue()
+  {
+    return getField(MAP_VALUE);
+  }
+  
+  public String getRecordId()
+  {
+    return getField(ZNRECORD_ID);
+  }
+  
+  /* Josql function handlers */
+  public static String getField(ZNRecordRow row, String rowField)
+  {
+    return row.getField(rowField);
+  }
+  
+  public static List<ZNRecordRow> convertSimpleFields(ZNRecord record)
+  {
+    List<ZNRecordRow> result = new ArrayList<ZNRecordRow>();
+    for(String key : record.getSimpleFields().keySet())
+    {
+      ZNRecordRow row = new ZNRecordRow();
+      row.putField(ZNRECORD_ID, record.getId());
+      row.putField(SIMPLE_KEY, key);
+      row.putField(SIMPLE_VALUE, record.getSimpleField(key));
+      result.add(row);
+    }
+    return result;
+  }
+  
+  public static List<ZNRecordRow> convertListFields(ZNRecord record)
+  {
+    List<ZNRecordRow> result = new ArrayList<ZNRecordRow>();
+    for(String key : record.getListFields().keySet())
+    {
+      int order = 0;
+      for(String value : record.getListField(key))
+      {
+        ZNRecordRow row = new ZNRecordRow();
+        row.putField(ZNRECORD_ID, record.getId());
+        row.putField(LIST_KEY, key);
+        row.putField(LIST_VALUE, record.getSimpleField(key));
+        row.putField(LIST_VALUE_INDEX, ""+order);
+        order++;
+        result.add(row);
+      }
+    }
+    return result;
+  }
+  
+  public static List<ZNRecordRow> convertMapFields(ZNRecord record)
+  {
+    List<ZNRecordRow> result = new ArrayList<ZNRecordRow>();
+    for(String key0 : record.getMapFields().keySet())
+    {
+      for(String key1 : record.getMapField(key0).keySet())
+      {
+        ZNRecordRow row = new ZNRecordRow();
+        row.putField(ZNRECORD_ID, record.getId());
+        row.putField(MAP_KEY, key0);
+        row.putField(MAP_SUBKEY, key1);
+        row.putField(MAP_VALUE, record.getMapField(key0).get(key1));
+        result.add(row);
+      }
+    }
+    return result;
+  }
+  
+  public static List<ZNRecordRow> flatten(ZNRecord record)
+  {
+    List<ZNRecordRow> result = convertMapFields(record);
+    result.addAll(convertListFields(record));
+    result.addAll(convertSimpleFields(record));
+    return result;
+  }
+  
+  public static List<ZNRecordRow> flatten(Collection<ZNRecord> recordList)
+  {
+    List<ZNRecordRow> result = new ArrayList<ZNRecordRow>();
+    for(ZNRecord record : recordList)
+    {
+      result.addAll(flatten(record));
+    }
+    return result;
+  }
+  
+  public static List<ZNRecordRow> getRowListFromMap(Map<String, List<ZNRecordRow>> rowMap, String key)
+  {
+    return rowMap.get(key);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/josql/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/josql/package-info.java b/helix-core/src/main/java/org/apache/helix/josql/package-info.java
new file mode 100644
index 0000000..d0e7c52
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/josql/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Jsql processor for Helix
+ * 
+ */
+package org.apache.helix.josql;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/file/DynamicFileHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/file/DynamicFileHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/file/DynamicFileHelixManager.java
new file mode 100644
index 0000000..5773841
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/file/DynamicFileHelixManager.java
@@ -0,0 +1,460 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.file;
+
+import static org.apache.helix.HelixConstants.ChangeType.CURRENT_STATE;
+import static org.apache.helix.HelixConstants.ChangeType.IDEAL_STATE;
+import static org.apache.helix.HelixConstants.ChangeType.LIVE_INSTANCE;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigChangeListener;
+import org.apache.helix.ControllerChangeListener;
+import org.apache.helix.CurrentStateChangeListener;
+import org.apache.helix.DataAccessor;
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.HealthStateChangeListener;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.IdealStateChangeListener;
+import org.apache.helix.InstanceType;
+import org.apache.helix.LiveInstanceChangeListener;
+import org.apache.helix.MessageListener;
+import org.apache.helix.PreConnectCallback;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ConfigScope.ConfigScopeProperty;
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
+import org.apache.helix.messaging.DefaultMessagingService;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.participant.HelixStateMachineEngine;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.store.PropertyJsonComparator;
+import org.apache.helix.store.PropertyJsonSerializer;
+import org.apache.helix.store.PropertyStore;
+import org.apache.helix.store.file.FilePropertyStore;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.tools.PropertiesReader;
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.Watcher.Event.EventType;
+
+
+@Deprecated
+public class DynamicFileHelixManager implements HelixManager
+{
+  private static final Logger LOG = Logger.getLogger(StaticFileHelixManager.class.getName());
+  private final FileDataAccessor _fileDataAccessor;
+  private final FileHelixDataAccessor _accessor;
+
+  private final String _clusterName;
+  private final InstanceType _instanceType;
+  private final String _instanceName;
+  private boolean _isConnected;
+  private final List<FileCallbackHandler> _handlers;
+  private final FileHelixAdmin _mgmtTool;
+
+  private final String _sessionId; // = "12345";
+  public static final String configFile = "configFile";
+  private final DefaultMessagingService _messagingService;
+  private final FilePropertyStore<ZNRecord> _store;
+  private final String _version;
+  private final StateMachineEngine _stateMachEngine;
+  private PropertyStore<ZNRecord> _propertyStore = null;
+
+  public DynamicFileHelixManager(String clusterName, String instanceName,
+      InstanceType instanceType, FilePropertyStore<ZNRecord> store)
+  {
+    _clusterName = clusterName;
+    _instanceName = instanceName;
+    _instanceType = instanceType;
+
+    _handlers = new ArrayList<FileCallbackHandler>();
+
+    _store = store;
+    _fileDataAccessor = new FileDataAccessor(_store, clusterName);
+    _accessor = new FileHelixDataAccessor(_store, clusterName);
+
+    _mgmtTool = new FileHelixAdmin(_store);
+    _messagingService = new DefaultMessagingService(this);
+    _sessionId = UUID.randomUUID().toString();
+    if (instanceType == InstanceType.PARTICIPANT)
+    {
+      addLiveInstance();
+      addMessageListener(_messagingService.getExecutor(), _instanceName);
+    }
+
+    _version = new PropertiesReader("cluster-manager-version.properties")
+        .getProperty("clustermanager.version");
+
+    _stateMachEngine = new HelixStateMachineEngine(this);
+
+    _messagingService.registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
+        _stateMachEngine);
+  }
+
+  @Override
+  public void disconnect()
+  {
+    _store.stop();
+    _messagingService.getExecutor().shutDown();
+
+    _isConnected = false;
+  }
+
+  @Override
+  public void addIdealStateChangeListener(IdealStateChangeListener listener)
+  {
+    final String path = HelixUtil.getIdealStatePath(_clusterName);
+
+    FileCallbackHandler callbackHandler = createCallBackHandler(path, listener, new EventType[] {
+        EventType.NodeDataChanged, EventType.NodeDeleted, EventType.NodeCreated }, IDEAL_STATE);
+    _handlers.add(callbackHandler);
+
+  }
+
+  @Override
+  public void addLiveInstanceChangeListener(LiveInstanceChangeListener listener)
+  {
+    final String path = HelixUtil.getLiveInstancesPath(_clusterName);
+    FileCallbackHandler callbackHandler = createCallBackHandler(path, listener, new EventType[] {
+        EventType.NodeDataChanged, EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated },
+        LIVE_INSTANCE);
+    _handlers.add(callbackHandler);
+  }
+
+  @Override
+  public void addConfigChangeListener(ConfigChangeListener listener)
+  {
+    throw new UnsupportedOperationException(
+        "addConfigChangeListener() is NOT supported by File Based cluster manager");
+  }
+
+  @Override
+  public void addMessageListener(MessageListener listener, String instanceName)
+  {
+    final String path = HelixUtil.getMessagePath(_clusterName, instanceName);
+
+    FileCallbackHandler callbackHandler = createCallBackHandler(path, listener, new EventType[] {
+        EventType.NodeDataChanged, EventType.NodeDeleted, EventType.NodeCreated },
+        ChangeType.MESSAGE);
+    _handlers.add(callbackHandler);
+
+  }
+
+  @Override
+  public void addCurrentStateChangeListener(CurrentStateChangeListener listener,
+      String instanceName, String sessionId)
+  {
+    final String path = HelixUtil.getCurrentStateBasePath(_clusterName, instanceName) + "/"
+        + sessionId;
+
+    FileCallbackHandler callbackHandler = createCallBackHandler(path, listener, new EventType[] {
+        EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated },
+        CURRENT_STATE);
+
+    _handlers.add(callbackHandler);
+  }
+
+  @Override
+  public void addExternalViewChangeListener(ExternalViewChangeListener listener)
+  {
+    throw new UnsupportedOperationException(
+        "addExternalViewChangeListener() is NOT supported by File Based cluster manager");
+  }
+
+  @Override
+  public DataAccessor getDataAccessor()
+  {
+    return _fileDataAccessor;
+  }
+
+  @Override
+  public String getClusterName()
+  {
+    return _clusterName;
+  }
+
+  @Override
+  public String getInstanceName()
+  {
+    return _instanceName;
+  }
+
+  @Override
+  public void connect()
+  {
+    if (!isClusterSetup(_clusterName))
+    {
+      throw new HelixException("Initial cluster structure is not set up for cluster:"
+          + _clusterName);
+    }
+    _messagingService.onConnected();
+    _store.start();
+    _isConnected = true;
+  }
+
+  @Override
+  public String getSessionId()
+  {
+    return _sessionId;
+  }
+
+  @Override
+  public boolean isConnected()
+  {
+    return _isConnected;
+  }
+
+  private boolean isClusterSetup(String clusterName)
+  {
+    if (clusterName == null || _store == null)
+    {
+      return false;
+    }
+
+    boolean isValid = _store.exists(PropertyPathConfig.getPath(PropertyType.IDEALSTATES,
+        clusterName))
+        && _store.exists(PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
+            ConfigScopeProperty.CLUSTER.toString()))
+        && _store.exists(PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
+            ConfigScopeProperty.PARTICIPANT.toString()))
+        && _store.exists(PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
+            ConfigScopeProperty.RESOURCE.toString()))
+        && _store.exists(PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, clusterName))
+        && _store.exists(PropertyPathConfig.getPath(PropertyType.LIVEINSTANCES, clusterName))
+        && _store.exists(PropertyPathConfig.getPath(PropertyType.INSTANCES, clusterName))
+        && _store.exists(PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName))
+        && _store.exists(PropertyPathConfig.getPath(PropertyType.CONTROLLER, clusterName))
+        && _store.exists(PropertyPathConfig.getPath(PropertyType.STATEMODELDEFS, clusterName))
+        && _store.exists(PropertyPathConfig.getPath(PropertyType.MESSAGES_CONTROLLER, clusterName))
+        && _store.exists(PropertyPathConfig.getPath(PropertyType.ERRORS_CONTROLLER, clusterName))
+        && _store.exists(PropertyPathConfig.getPath(PropertyType.STATUSUPDATES_CONTROLLER,
+            clusterName))
+        && _store.exists(PropertyPathConfig.getPath(PropertyType.HISTORY, clusterName));
+
+    return isValid;
+  }
+
+  private boolean isInstanceSetup()
+  {
+    if (_instanceType == InstanceType.PARTICIPANT
+        || _instanceType == InstanceType.CONTROLLER_PARTICIPANT)
+    {
+      boolean isValid = _store.exists(PropertyPathConfig.getPath(PropertyType.CONFIGS,
+          _clusterName, ConfigScopeProperty.PARTICIPANT.toString(), _instanceName))
+          && _store.exists(PropertyPathConfig.getPath(PropertyType.MESSAGES, _clusterName,
+              _instanceName))
+          && _store.exists(PropertyPathConfig.getPath(PropertyType.CURRENTSTATES, _clusterName,
+              _instanceName))
+          && _store.exists(PropertyPathConfig.getPath(PropertyType.STATUSUPDATES, _clusterName,
+              _instanceName))
+          && _store.exists(PropertyPathConfig.getPath(PropertyType.ERRORS, _clusterName,
+              _instanceName));
+
+      return isValid;
+    }
+    return true;
+  }
+
+  private void addLiveInstance()
+  {
+    if (!isClusterSetup(_clusterName))
+    {
+      throw new HelixException("Initial cluster structure is not set up for cluster:"
+          + _clusterName);
+    }
+
+    if (!isInstanceSetup())
+    {
+      throw new HelixException("Instance is not configured for instance:" + _instanceName
+          + " instanceType:" + _instanceType);
+    }
+
+    LiveInstance liveInstance = new LiveInstance(_instanceName);
+    liveInstance.setSessionId(_sessionId);
+//    _fileDataAccessor.setProperty(PropertyType.LIVEINSTANCES, liveInstance.getRecord(),
+//        _instanceName);
+    
+    Builder keyBuilder = _accessor.keyBuilder();
+    _accessor.setProperty(keyBuilder.liveInstance(_instanceName), liveInstance);
+
+  }
+
+  @Override
+  public long getLastNotificationTime()
+  {
+    return 0;
+  }
+
+  @Override
+  public void addControllerListener(ControllerChangeListener listener)
+  {
+    throw new UnsupportedOperationException(
+        "addControllerListener() is NOT supported by File Based cluster manager");
+  }
+
+  @Override
+  public boolean removeListener(Object listener)
+  {
+    // TODO Auto-generated method stub
+    return false;
+  }
+
+  private FileCallbackHandler createCallBackHandler(String path, Object listener,
+      EventType[] eventTypes, ChangeType changeType)
+  {
+    if (listener == null)
+    {
+      throw new HelixException("Listener cannot be null");
+    }
+    return new FileCallbackHandler(this, path, listener, eventTypes, changeType);
+  }
+
+  @Override
+  public HelixAdmin getClusterManagmentTool()
+  {
+    return _mgmtTool;
+  }
+
+  private void checkConnected()
+  {
+    if (!isConnected())
+    {
+      throw new HelixException("ClusterManager not connected. Call clusterManager.connect()");
+    }
+  }
+
+  @Override
+  public PropertyStore<ZNRecord> getPropertyStore()
+  {
+    checkConnected();
+
+    if (_propertyStore == null)
+    {
+      String path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, _clusterName);
+
+      String propertyStoreRoot = _store.getPropertyRootNamespace() + path;
+      _propertyStore =
+          new FilePropertyStore<ZNRecord>(new PropertyJsonSerializer<ZNRecord>(ZNRecord.class),
+                                          propertyStoreRoot,
+                                          new PropertyJsonComparator<ZNRecord>(ZNRecord.class));
+    }
+    return _propertyStore;
+  }
+
+  @Override
+  public ClusterMessagingService getMessagingService()
+  {
+    return _messagingService;
+  }
+
+  @Override
+  public ParticipantHealthReportCollector getHealthReportCollector()
+  {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public InstanceType getInstanceType()
+  {
+    return _instanceType;
+  }
+
+  @Override
+  public void addHealthStateChangeListener(HealthStateChangeListener listener, String instanceName)
+      throws Exception
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public String getVersion()
+  {
+    return _version;
+  }
+
+  @Override
+  public StateMachineEngine getStateMachineEngine()
+  {
+    return _stateMachEngine;
+  }
+
+  @Override
+  public boolean isLeader()
+  {
+    if (_instanceType != InstanceType.CONTROLLER)
+    {
+      return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public ConfigAccessor getConfigAccessor()
+  {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public void startTimerTasks()
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void stopTimerTasks()
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public HelixDataAccessor getHelixDataAccessor()
+  {
+    return _accessor;
+  }
+
+  @Override
+  public void addPreConnectCallback(PreConnectCallback callback)
+  {
+    // TODO Auto-generated method stub
+    
+  }
+
+  @Override
+  public ZkHelixPropertyStore<ZNRecord> getHelixPropertyStore()
+  {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/file/FileCallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/file/FileCallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/file/FileCallbackHandler.java
new file mode 100644
index 0000000..12814a2
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/file/FileCallbackHandler.java
@@ -0,0 +1,293 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.file;
+
+import static org.apache.helix.HelixConstants.ChangeType.CONFIG;
+import static org.apache.helix.HelixConstants.ChangeType.CURRENT_STATE;
+import static org.apache.helix.HelixConstants.ChangeType.EXTERNAL_VIEW;
+import static org.apache.helix.HelixConstants.ChangeType.IDEAL_STATE;
+import static org.apache.helix.HelixConstants.ChangeType.LIVE_INSTANCE;
+import static org.apache.helix.HelixConstants.ChangeType.MESSAGE;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.helix.ConfigChangeListener;
+import org.apache.helix.ControllerChangeListener;
+import org.apache.helix.CurrentStateChangeListener;
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.HelixManager;
+import org.apache.helix.IdealStateChangeListener;
+import org.apache.helix.LiveInstanceChangeListener;
+import org.apache.helix.MessageListener;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.store.PropertyChangeListener;
+import org.apache.helix.store.PropertyStoreException;
+import org.apache.helix.store.file.FilePropertyStore;
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.Watcher.Event.EventType;
+
+
+// TODO remove code duplication: CallbackHandler and CallbackHandlerForFile
+@Deprecated
+public class FileCallbackHandler implements PropertyChangeListener<ZNRecord>
+{
+
+  private static Logger LOG = Logger.getLogger(FileCallbackHandler.class);
+
+  private final String _path;
+  private final Object _listener;
+  private final EventType[] _eventTypes;
+  private final ChangeType _changeType;
+//  private final FileDataAccessor _accessor;
+  private final FileHelixDataAccessor _accessor;
+  private final AtomicLong lastNotificationTimeStamp;
+  private final HelixManager _manager;
+  private final FilePropertyStore<ZNRecord> _store;
+
+  public FileCallbackHandler(HelixManager manager, String path, Object listener,
+      EventType[] eventTypes, ChangeType changeType)
+  {
+    this._manager = manager;
+    this._accessor = (FileHelixDataAccessor) manager.getHelixDataAccessor();
+    this._path = path;
+    this._listener = listener;
+    this._eventTypes = eventTypes;
+    this._changeType = changeType;
+    this._store = (FilePropertyStore<ZNRecord>) _accessor.getStore();
+    lastNotificationTimeStamp = new AtomicLong(System.nanoTime());
+
+    init();
+  }
+
+  public Object getListener()
+  {
+    return _listener;
+  }
+
+  public Object getPath()
+  {
+    return _path;
+  }
+
+  public void invoke(NotificationContext changeContext) throws Exception
+  {
+    // This allows the listener to work with one change at a time
+    synchronized (_listener)
+    {
+      if (LOG.isDebugEnabled())
+      {
+        LOG.debug(Thread.currentThread().getId() + " START:INVOKE "
+            + changeContext.getPathChanged() + " listener:"
+            + _listener.getClass().getCanonicalName());
+      }
+      
+      Builder keyBuilder = _accessor.keyBuilder();
+      
+      if (_changeType == IDEAL_STATE)
+      {
+        // System.err.println("ideal state change");
+        IdealStateChangeListener idealStateChangeListener = (IdealStateChangeListener) _listener;
+        subscribeForChanges(changeContext, true, true);
+        List<IdealState> idealStates = _accessor.getChildValues(keyBuilder.idealStates());
+        idealStateChangeListener.onIdealStateChange(idealStates, changeContext);
+
+      } else if (_changeType == CONFIG)
+      {
+
+        ConfigChangeListener configChangeListener = (ConfigChangeListener) _listener;
+        subscribeForChanges(changeContext, true, true);
+        List<InstanceConfig> configs = _accessor.getChildValues(keyBuilder.instanceConfigs());
+        configChangeListener.onConfigChange(configs, changeContext);
+
+      } else if (_changeType == LIVE_INSTANCE)
+      {
+        LiveInstanceChangeListener liveInstanceChangeListener = (LiveInstanceChangeListener) _listener;
+        subscribeForChanges(changeContext, true, false);
+        List<LiveInstance> liveInstances = _accessor.getChildValues(keyBuilder.liveInstances());
+        liveInstanceChangeListener.onLiveInstanceChange(liveInstances, changeContext);
+
+      } else if (_changeType == CURRENT_STATE)
+      {
+        CurrentStateChangeListener currentStateChangeListener;
+        currentStateChangeListener = (CurrentStateChangeListener) _listener;
+        subscribeForChanges(changeContext, true, true);
+        String instanceName = HelixUtil.getInstanceNameFromPath(_path);
+        String[] pathParts = _path.split("/");
+        List<CurrentState> currentStates = _accessor.getChildValues(keyBuilder.currentStates(instanceName, pathParts[pathParts.length - 1]));
+        currentStateChangeListener.onStateChange(instanceName, currentStates, changeContext);
+
+      } else if (_changeType == MESSAGE)
+      {
+        MessageListener messageListener = (MessageListener) _listener;
+        subscribeForChanges(changeContext, true, false);
+        String instanceName = _manager.getInstanceName();
+        List<Message> messages = _accessor.getChildValues(keyBuilder.messages(instanceName));
+        messageListener.onMessage(instanceName, messages, changeContext);
+      } else if (_changeType == EXTERNAL_VIEW)
+      {
+        ExternalViewChangeListener externalViewListener = (ExternalViewChangeListener) _listener;
+        subscribeForChanges(changeContext, true, true);
+        List<ExternalView> externalViewList = _accessor.getChildValues(keyBuilder.externalViews());
+        externalViewListener.onExternalViewChange(externalViewList, changeContext);
+      } else if (_changeType == ChangeType.CONTROLLER)
+      {
+        ControllerChangeListener controllerChangelistener = (ControllerChangeListener) _listener;
+        subscribeForChanges(changeContext, true, false);
+        controllerChangelistener.onControllerChange(changeContext);
+      }
+
+      if (LOG.isDebugEnabled())
+      {
+        LOG.debug(Thread.currentThread().getId() + " END:INVOKE " + changeContext.getPathChanged()
+            + " listener:" + _listener.getClass().getCanonicalName());
+      }
+    }
+  }
+
+  private void subscribeForChanges(NotificationContext changeContext, boolean watchParent,
+      boolean watchChild)
+  {
+    if (changeContext.getType() == NotificationContext.Type.INIT)
+    {
+      try
+      {
+        // _accessor.subscribeForPropertyChange(_path, this);
+        _store.subscribeForPropertyChange(_path, this);
+      } catch (PropertyStoreException e)
+      {
+        LOG.error("fail to subscribe for changes" + "\nexception:" + e);
+      }
+    }
+  }
+
+  public EventType[] getEventTypes()
+  {
+    return _eventTypes;
+  }
+
+  // this will invoke the listener so that it sets up the initial values from
+  // the file property store if any exists
+  public void init()
+  {
+    updateNotificationTime(System.nanoTime());
+    try
+    {
+      NotificationContext changeContext = new NotificationContext(_manager);
+      changeContext.setType(NotificationContext.Type.INIT);
+      invoke(changeContext);
+    } catch (Exception e)
+    {
+      // TODO handle exception
+      LOG.error("fail to init", e);
+    }
+  }
+
+  public void reset()
+  {
+    try
+    {
+      NotificationContext changeContext = new NotificationContext(_manager);
+      changeContext.setType(NotificationContext.Type.FINALIZE);
+      invoke(changeContext);
+    } catch (Exception e)
+    {
+      // TODO handle exception
+      LOG.error("fail to reset" + "\nexception:" + e);
+      // ZKExceptionHandler.getInstance().handle(e);
+    }
+  }
+
+  private void updateNotificationTime(long nanoTime)
+  {
+    long l = lastNotificationTimeStamp.get();
+    while (nanoTime > l)
+    {
+      boolean b = lastNotificationTimeStamp.compareAndSet(l, nanoTime);
+      if (b)
+      {
+        break;
+      } else
+      {
+        l = lastNotificationTimeStamp.get();
+      }
+    }
+  }
+
+  @Override
+  public void onPropertyChange(String key)
+  {
+    // debug
+    // LOG.error("on property change, key:" + key + ", path:" + _path);
+
+    try
+    {
+      if (needToNotify(key))
+      {
+        // debug
+        // System.err.println("notified on property change, key:" + key +
+        // ", path:" +
+        // path);
+
+        updateNotificationTime(System.nanoTime());
+        NotificationContext changeContext = new NotificationContext(_manager);
+        changeContext.setType(NotificationContext.Type.CALLBACK);
+        invoke(changeContext);
+      }
+    } catch (Exception e)
+    {
+      // TODO handle exception
+      // ZKExceptionHandler.getInstance().handle(e);
+      LOG.error("fail onPropertyChange", e);
+    }
+  }
+
+  private boolean needToNotify(String key)
+  {
+    boolean ret = false;
+    switch (_changeType)
+    {
+    // both child/data changes matter
+    case IDEAL_STATE:
+    case CURRENT_STATE:
+    case CONFIG:
+      ret = key.startsWith(_path);
+      break;
+    // only child changes matter
+    case LIVE_INSTANCE:
+    case MESSAGE:
+    case EXTERNAL_VIEW:
+    case CONTROLLER:
+      // ret = key.equals(_path);
+      ret = key.startsWith(_path);
+      break;
+    default:
+      break;
+    }
+
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/file/FileDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/file/FileDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/file/FileDataAccessor.java
new file mode 100644
index 0000000..571586c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/file/FileDataAccessor.java
@@ -0,0 +1,319 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.file;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.helix.DataAccessor;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.PropertyStore;
+import org.apache.helix.store.PropertyStoreException;
+import org.apache.helix.store.file.FilePropertyStore;
+import org.apache.log4j.Logger;
+
+
+@Deprecated
+public class FileDataAccessor implements DataAccessor
+{
+  private static Logger LOG = Logger.getLogger(FileDataAccessor.class);
+  // store that is used by FileDataAccessor
+  private final FilePropertyStore<ZNRecord> _store;
+  private final String _clusterName;
+  private final ReadWriteLock _readWriteLock = new ReentrantReadWriteLock();
+
+  public FileDataAccessor(FilePropertyStore<ZNRecord> store, String clusterName)
+  {
+    _store = store;
+    _clusterName = clusterName;
+  }
+
+  @Override
+  public boolean setProperty(PropertyType type, HelixProperty value, String... keys)
+  {
+    return setProperty(type, value.getRecord(), keys);
+  }
+
+  @Override
+  public boolean setProperty(PropertyType type, ZNRecord value, String... keys)
+  {
+    String path = PropertyPathConfig.getPath(type, _clusterName, keys);
+
+    try
+    {
+      _readWriteLock.writeLock().lock();
+      _store.setProperty(path, value);
+      return true;
+    }
+    catch(PropertyStoreException e)
+    {
+      LOG.error("Fail to set cluster property clusterName: " + _clusterName +
+                " type:" + type +
+                " keys:" + keys + "\nexception: " + e);
+    }
+    finally
+    {
+      _readWriteLock.writeLock().unlock();
+    }
+    return false;
+  }
+
+  @Override
+  public boolean updateProperty(PropertyType type, HelixProperty value, String... keys)
+  {
+    return updateProperty(type, value.getRecord(), keys);
+  }
+
+  @Override
+  public boolean updateProperty(PropertyType type, ZNRecord value, String... keys)
+  {
+    try
+    {
+      _readWriteLock.writeLock().lock();
+      String path = PropertyPathConfig.getPath(type, _clusterName, keys);
+      if (type.isUpdateOnlyOnExists())
+      {
+        updateIfExists(path, value, type.isMergeOnUpdate());
+      }
+      else
+      {
+        createOrUpdate(path, value, type.isMergeOnUpdate());
+      }
+      return true;
+    }
+    catch (PropertyStoreException e)
+    {
+      LOG.error("fail to update instance property, " +
+          " type:" + type + " keys:" + keys, e);
+    }
+    finally
+    {
+      _readWriteLock.writeLock().unlock();
+    }
+    return false;
+
+  }
+
+  @Override
+  public <T extends HelixProperty>
+    T getProperty(Class<T> clazz, PropertyType type, String... keys)
+  {
+    ZNRecord record = getProperty(type, keys);
+    if (record == null)
+    {
+      return null;
+    }
+    return HelixProperty.convertToTypedInstance(clazz, record);
+  }
+
+  @Override
+  public ZNRecord getProperty(PropertyType type, String... keys)
+  {
+    String path = PropertyPathConfig.getPath(type, _clusterName, keys);
+
+    try
+    {
+      _readWriteLock.readLock().lock();
+      return _store.getProperty(path);
+    }
+    catch(PropertyStoreException e)
+    {
+      LOG.error("Fail to get cluster property clusterName: " + _clusterName +
+                " type:" + type +
+                " keys:" + keys, e);
+    }
+    finally
+    {
+      _readWriteLock.readLock().unlock();
+    }
+    return null;
+  }
+
+  @Override
+  public boolean removeProperty(PropertyType type, String... keys)
+  {
+    String path = PropertyPathConfig.getPath(type, _clusterName, keys);
+
+    try
+    {
+      _readWriteLock.writeLock().lock();
+      _store.removeProperty(path);
+      return true;
+    }
+    catch (PropertyStoreException e)
+    {
+      LOG.error("Fail to remove instance property, "  +
+          " type:" + type + " keys:" + keys, e);
+    }
+    finally
+    {
+      _readWriteLock.writeLock().unlock();
+    }
+
+    return false;
+  }
+
+  @Override
+  public List<String> getChildNames(PropertyType type, String... keys)
+  {
+    String path = PropertyPathConfig.getPath(type, _clusterName, keys);
+
+    try
+    {
+      _readWriteLock.readLock().lock();
+
+      List<String> childs = _store.getPropertyNames(path);
+      return childs;
+    }
+    catch(PropertyStoreException e)
+    {
+      LOG.error("Fail to get child names:" + _clusterName +
+          " parentPath:" + path + "\nexception: " + e);
+    }
+    finally
+    {
+      _readWriteLock.readLock().unlock();
+    }
+
+    return Collections.emptyList();
+  }
+
+  @Override
+  public <T extends HelixProperty>
+    List<T> getChildValues(Class<T> clazz, PropertyType type, String... keys)
+  {
+    List<ZNRecord> list = getChildValues(type, keys);
+    return HelixProperty.convertToTypedList(clazz, list);
+  }
+
+  @Override
+  public List<ZNRecord> getChildValues(PropertyType type, String... keys)
+  {
+    String path = PropertyPathConfig.getPath(type, _clusterName, keys);
+    List<ZNRecord> records = new ArrayList<ZNRecord>();
+    try
+    {
+      _readWriteLock.readLock().lock();
+
+      List<String> childs = _store.getPropertyNames(path);
+      if (childs == null || childs.size() == 0)
+      {
+        return Collections.emptyList();
+      }
+
+      for (String child : childs)
+      {
+        ZNRecord record = _store.getProperty(child);
+        if (record != null)
+        {
+          records.add(record);
+        }
+      }
+    }
+    catch(PropertyStoreException e)
+    {
+      LOG.error("Fail to get child properties cluster:" + _clusterName +
+          " parentPath:" + path + "\nexception: " + e);
+    }
+    finally
+    {
+      _readWriteLock.readLock().unlock();
+    }
+    return records;
+  }
+
+  // HACK remove it later
+  public PropertyStore<ZNRecord> getStore()
+  {
+    return _store;
+  }
+
+  private void updateIfExists(String path, final ZNRecord record, boolean mergeOnUpdate)
+      throws PropertyStoreException
+  {
+    if (_store.exists(path))
+    {
+      _store.setProperty(path, record);
+    }
+  }
+
+  private void createOrUpdate(String path, final ZNRecord record, final boolean mergeOnUpdate)
+      throws PropertyStoreException
+  {
+    final int RETRYLIMIT = 3;
+    int retryCount = 0;
+    while (retryCount < RETRYLIMIT)
+    {
+      try
+      {
+        if (_store.exists(path))
+        {
+          DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>()
+          {
+            @Override
+            public ZNRecord update(ZNRecord currentData)
+            {
+              if(mergeOnUpdate)
+              {
+                currentData.merge(record);
+                return currentData;
+              }
+              return record;
+            }
+          };
+          _store.updatePropertyUntilSucceed(path, updater);
+
+        }
+        else
+        {
+          if(record.getDeltaList().size() > 0)
+          {
+            ZNRecord newRecord = new ZNRecord(record.getId());
+            newRecord.merge(record);
+            _store.setProperty(path, newRecord);
+          }
+          else
+          {
+            _store.setProperty(path, record);
+          }
+        }
+        break;
+      }
+      catch (Exception e)
+      {
+        retryCount = retryCount + 1;
+        LOG.warn("Exception trying to update " + path + " Exception:"
+            + e.getMessage() + ". Will retry.");
+      }
+    }
+  }
+
+  @Override
+  public <T extends HelixProperty> Map<String, T> getChildValuesMap(Class<T> clazz,
+      PropertyType type, String... keys)
+  {
+    List<T> list = getChildValues(clazz, type, keys);
+    return HelixProperty.convertListToMap(list);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/file/FileHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/file/FileHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/file/FileHelixAdmin.java
new file mode 100644
index 0000000..58bff12
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/file/FileHelixAdmin.java
@@ -0,0 +1,486 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.file;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.ConfigScope;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ConfigScope.ConfigScopeProperty;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.helix.store.PropertyStoreException;
+import org.apache.helix.store.file.FilePropertyStore;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+
+
+@Deprecated
+public class FileHelixAdmin implements HelixAdmin
+{
+  private static Logger                     logger =
+                                                       Logger.getLogger(FileHelixAdmin.class);
+  private final FilePropertyStore<ZNRecord> _store;
+
+  public FileHelixAdmin(FilePropertyStore<ZNRecord> store)
+  {
+    _store = store;
+  }
+
+  @Override
+  public List<String> getClusters()
+  {
+    throw new UnsupportedOperationException("getClusters() is NOT supported by FileClusterManagementTool");
+
+  }
+
+  @Override
+  public List<String> getInstancesInCluster(String clusterName)
+  {
+    // String path = HelixUtil.getConfigPath(clusterName);
+    String path =
+        PropertyPathConfig.getPath(PropertyType.CONFIGS,
+                                   clusterName,
+                                   ConfigScopeProperty.PARTICIPANT.toString());
+
+    List<String> childs = null;
+    List<String> instanceNames = new ArrayList<String>();
+    try
+    {
+      childs = _store.getPropertyNames(path);
+      for (String child : childs)
+      {
+        // strip config path from instanceName
+        String instanceName = child.substring(child.lastIndexOf('/') + 1);
+        instanceNames.add(instanceName);
+      }
+      return instanceNames;
+    }
+    catch (PropertyStoreException e)
+    {
+      logger.error("Fail to getInstancesInCluster, cluster " + clusterName, e);
+    }
+
+    return null;
+  }
+
+  @Override
+  public List<String> getResourcesInCluster(String clusterName)
+  {
+    // TODO Auto-generated method stub
+    // return null;
+    throw new UnsupportedOperationException("getResourcesInCluster() is NOT supported by FileClusterManagementTool");
+
+  }
+
+  @Override
+  public void addCluster(String clusterName, boolean overwritePrevRecord)
+  {
+    String path;
+    try
+    {
+      _store.removeNamespace(clusterName);
+      _store.createPropertyNamespace(clusterName);
+      _store.createPropertyNamespace(HelixUtil.getIdealStatePath(clusterName));
+
+      // CONFIG's
+      // _store.createPropertyNamespace(HelixUtil.getConfigPath(clusterName));
+      path =
+          PropertyPathConfig.getPath(PropertyType.CONFIGS,
+                                     clusterName,
+                                     ConfigScopeProperty.CLUSTER.toString(),
+                                     clusterName);
+      _store.setProperty(path, new ZNRecord(clusterName));
+      path =
+          PropertyPathConfig.getPath(PropertyType.CONFIGS,
+                                     clusterName,
+                                     ConfigScopeProperty.PARTICIPANT.toString());
+      _store.createPropertyNamespace(path);
+      path =
+          PropertyPathConfig.getPath(PropertyType.CONFIGS,
+                                     clusterName,
+                                     ConfigScopeProperty.RESOURCE.toString());
+      _store.createPropertyNamespace(path);
+
+      // PROPERTY STORE
+      path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, clusterName);
+      _store.createPropertyNamespace(path);
+
+      _store.createPropertyNamespace(HelixUtil.getLiveInstancesPath(clusterName));
+      _store.createPropertyNamespace(HelixUtil.getMemberInstancesPath(clusterName));
+      _store.createPropertyNamespace(HelixUtil.getExternalViewPath(clusterName));
+      _store.createPropertyNamespace(HelixUtil.getStateModelDefinitionPath(clusterName));
+
+      StateModelConfigGenerator generator = new StateModelConfigGenerator();
+      addStateModelDef(clusterName,
+                       "MasterSlave",
+                       new StateModelDefinition(generator.generateConfigForMasterSlave()));
+
+      // controller
+      _store.createPropertyNamespace(HelixUtil.getControllerPath(clusterName));
+      path = PropertyPathConfig.getPath(PropertyType.HISTORY, clusterName);
+      final ZNRecord emptyHistory = new ZNRecord(PropertyType.HISTORY.toString());
+      final List<String> emptyList = new ArrayList<String>();
+      emptyHistory.setListField(clusterName, emptyList);
+      _store.setProperty(path, emptyHistory);
+
+      path = PropertyPathConfig.getPath(PropertyType.MESSAGES_CONTROLLER, clusterName);
+      _store.createPropertyNamespace(path);
+
+      path =
+          PropertyPathConfig.getPath(PropertyType.STATUSUPDATES_CONTROLLER, clusterName);
+      _store.createPropertyNamespace(path);
+
+      path = PropertyPathConfig.getPath(PropertyType.ERRORS_CONTROLLER, clusterName);
+      _store.createPropertyNamespace(path);
+
+    }
+    catch (PropertyStoreException e)
+    {
+      logger.error("Fail to add cluster " + clusterName, e);
+    }
+
+  }
+
+  @Override
+  public void addResource(String clusterName,
+                          String resource,
+                          int numResources,
+                          String stateModelRef)
+  {
+    String idealStatePath = HelixUtil.getIdealStatePath(clusterName);
+    String resourceIdealStatePath = idealStatePath + "/" + resource;
+
+    // if (_zkClient.exists(dbIdealStatePath))
+    // {
+    // logger.warn("Skip the operation. DB ideal state directory exists:"
+    // + dbIdealStatePath);
+    // return;
+    // }
+
+    IdealState idealState = new IdealState(resource);
+    idealState.setNumPartitions(numResources);
+    idealState.setStateModelDefRef(stateModelRef);
+    idealState.setReplicas(Integer.toString(0));
+    idealState.setIdealStateMode(IdealStateModeProperty.AUTO.toString());
+    try
+    {
+      _store.setProperty(resourceIdealStatePath, idealState.getRecord());
+    }
+    catch (PropertyStoreException e)
+    {
+      logger.error("Fail to add resource, cluster:" + clusterName + " resourceName:"
+          + resource, e);
+    }
+
+  }
+
+  @Override
+  public void addResource(String clusterName,
+                          String resource,
+                          int numResources,
+                          String stateModelRef,
+                          String idealStateMode)
+  {
+    throw new UnsupportedOperationException("ideal state mode not supported in file-based cluster manager");
+  }
+
+  @Override
+  public void addResource(String clusterName,
+                          String resource,
+                          int numResources,
+                          String stateModelRef,
+                          String idealStateMode,
+                          int bucketSize)
+  {
+    throw new UnsupportedOperationException("bucketize not supported in file-based cluster manager");
+  }
+
+  @Override
+  public void addInstance(String clusterName, InstanceConfig config)
+  {
+    String configsPath =
+        PropertyPathConfig.getPath(PropertyType.CONFIGS,
+                                   clusterName,
+                                   ConfigScopeProperty.PARTICIPANT.toString());
+    String nodeId = config.getId();
+    String nodeConfigPath = configsPath + "/" + nodeId;
+
+    try
+    {
+      _store.setProperty(nodeConfigPath, config.getRecord());
+      _store.createPropertyNamespace(HelixUtil.getMessagePath(clusterName, nodeId));
+      _store.createPropertyNamespace(HelixUtil.getCurrentStateBasePath(clusterName,
+                                                                       nodeId));
+      _store.createPropertyNamespace(HelixUtil.getErrorsPath(clusterName, nodeId));
+      _store.createPropertyNamespace(HelixUtil.getStatusUpdatesPath(clusterName, nodeId));
+    }
+    catch (Exception e)
+    {
+      logger.error("Fail to add node, cluster:" + clusterName + "\nexception: " + e);
+    }
+
+  }
+
+  @Override
+  public void dropInstance(String clusterName, InstanceConfig config)
+  {
+    String configsPath =
+        PropertyPathConfig.getPath(PropertyType.CONFIGS,
+                                   clusterName,
+                                   ConfigScopeProperty.PARTICIPANT.toString());
+    String nodeId = config.getId();
+    String nodeConfigPath = configsPath + "/" + nodeId;
+
+    try
+    {
+      _store.setProperty(nodeConfigPath, config.getRecord());
+    }
+    catch (Exception e)
+    {
+      logger.error("Fail to drop node, cluster:" + clusterName, e);
+    }
+  }
+
+  @Override
+  public IdealState getResourceIdealState(String clusterName, String resourceName)
+  {
+    return new FileDataAccessor(_store, clusterName).getProperty(IdealState.class,
+                                                                 PropertyType.IDEALSTATES,
+                                                                 resourceName);
+  }
+
+  @Override
+  public void setResourceIdealState(String clusterName,
+                                    String resourceName,
+                                    IdealState idealState)
+  {
+    new FileDataAccessor(_store, clusterName).setProperty(PropertyType.IDEALSTATES,
+                                                          idealState,
+                                                          resourceName);
+  }
+
+  @Override
+  public void enableInstance(String clusterName, String instanceName, boolean enabled)
+  {
+    throw new UnsupportedOperationException("enableInstance() is NOT supported by FileClusterManagementTool");
+  }
+
+  @Override
+  public void enableCluster(String clusterName, boolean enabled)
+  {
+    throw new UnsupportedOperationException("enableCluster() is NOT supported by FileClusterManagementTool");
+  }
+
+  @Override
+  public void addStateModelDef(String clusterName,
+                               String stateModelDef,
+                               StateModelDefinition stateModel)
+  {
+
+    String stateModelDefPath = HelixUtil.getStateModelDefinitionPath(clusterName);
+    String stateModelPath = stateModelDefPath + "/" + stateModelDef;
+
+    try
+    {
+      _store.setProperty(stateModelPath, stateModel.getRecord());
+    }
+    catch (PropertyStoreException e)
+    {
+      logger.error("Fail to addStateModelDef, cluster:" + clusterName + " stateModelDef:"
+          + stateModelDef, e);
+    }
+
+  }
+
+  @Override
+  public void dropResource(String clusterName, String resourceName)
+  {
+    new FileDataAccessor(_store, clusterName).removeProperty(PropertyType.IDEALSTATES,
+                                                             resourceName);
+
+  }
+
+  @Override
+  public List<String> getStateModelDefs(String clusterName)
+  {
+    throw new UnsupportedOperationException("getStateModelDefs() is NOT supported by FileClusterManagementTool");
+  }
+
+  @Override
+  public InstanceConfig getInstanceConfig(String clusterName, String instanceName)
+  {
+    throw new UnsupportedOperationException("getInstanceConfig() is NOT supported by FileClusterManagementTool");
+  }
+
+  @Override
+  public StateModelDefinition getStateModelDef(String clusterName, String stateModelName)
+  {
+    throw new UnsupportedOperationException("getStateModelDef() is NOT supported by FileClusterManagementTool");
+  }
+
+  @Override
+  public ExternalView getResourceExternalView(String clusterName, String resource)
+  {
+    throw new UnsupportedOperationException("getResourceExternalView() is NOT supported by FileClusterManagementTool");
+  }
+
+  @Override
+  public void enablePartition(boolean enabled,
+                              String clusterName,
+                              String instanceName,
+                              String resourceName,
+                              List<String> partitionNames)
+  {
+    throw new UnsupportedOperationException("enablePartition() is NOT supported by FileClusterManagementTool");
+  }
+
+  @Override
+  public void resetPartition(String clusterName,
+                             String instanceName,
+                             String resourceName,
+                             List<String> partitionNames)
+  {
+    throw new UnsupportedOperationException("resetPartition() is NOT supported by FileClusterManagementTool");
+  }
+
+  @Override
+  public void resetInstance(String clusterName, List<String> instanceNames)
+  {
+    throw new UnsupportedOperationException("resetInstance() is NOT supported by FileClusterManagementTool");
+  }
+
+  @Override
+  public void resetResource(String clusterName, List<String> resourceNames)
+  {
+    throw new UnsupportedOperationException("resetResource() is NOT supported by FileClusterManagementTool");
+  }
+
+  @Override
+  public void addStat(String clusterName, String statName)
+  {
+    throw new UnsupportedOperationException("addStat() is NOT supported by FileClusterManagementTool");
+
+  }
+
+  @Override
+  public void addAlert(String clusterName, String alertName)
+  {
+    throw new UnsupportedOperationException("addAlert() is NOT supported by FileClusterManagementTool");
+
+  }
+
+  @Override
+  public void dropStat(String clusterName, String statName)
+  {
+    throw new UnsupportedOperationException("dropStat() is NOT supported by FileClusterManagementTool");
+
+  }
+
+  @Override
+  public void dropAlert(String clusterName, String alertName)
+  {
+    throw new UnsupportedOperationException("dropAlert() is NOT supported by FileClusterManagementTool");
+
+  }
+
+  @Override
+  public void dropCluster(String clusterName)
+  {
+    throw new UnsupportedOperationException("dropCluster() is NOT supported by FileClusterManagementTool");
+  }
+
+  @Override
+  public void addClusterToGrandCluster(String clusterName, String grandCluster)
+  {
+    throw new UnsupportedOperationException("addCluster(clusterName, overwritePrevRecord, grandCluster) is NOT supported by FileClusterManagementTool");
+  }
+
+  @Override
+  public void setConfig(ConfigScope scope, Map<String, String> properties)
+  {
+    // TODO Auto-generated method stub
+    throw new UnsupportedOperationException("unsupported operation");
+
+  }
+
+  @Override
+  public Map<String, String> getConfig(ConfigScope scope, Set<String> keys)
+  {
+    // TODO Auto-generated method stub
+    throw new UnsupportedOperationException("unsupported operation");
+  }
+
+  @Override
+  public List<String> getConfigKeys(ConfigScopeProperty scope,
+                                    String clusterName,
+                                    String... keys)
+  {
+    // TODO Auto-generated method stub
+    throw new UnsupportedOperationException("unsupported operation");
+  }
+
+  @Override
+  public void removeConfig(ConfigScope scope, Set<String> keys)
+  {
+    // TODO Auto-generated method stub
+    throw new UnsupportedOperationException("unsupported operation");
+
+  }
+
+  @Override
+  public void rebalance(String clusterName, String resourceName, int replica)
+  {
+    // TODO Auto-generated method stub
+    throw new UnsupportedOperationException("unsupported operation");
+  }
+
+  @Override
+  public void addIdealState(String clusterName, String resourceName, String idealStateFile) throws IOException
+  {
+    // TODO Auto-generated method stub
+    throw new UnsupportedOperationException("unsupported operation");
+  }
+
+  @Override
+  public void addStateModelDef(String clusterName,
+                               String stateModelDefName,
+                               String stateModelDefFile) throws IOException
+  {
+    // TODO Auto-generated method stub
+    throw new UnsupportedOperationException("unsupported operation");
+  }
+
+  @Override
+  public void addMessageConstraint(String clusterName,
+                                   String constraintId,
+                                   Map<String, String> constraints)
+  {
+    // TODO Auto-generated method stub
+    throw new UnsupportedOperationException("unsupported operation");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/file/FileHelixDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/file/FileHelixDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/file/FileHelixDataAccessor.java
new file mode 100644
index 0000000..9b3db9b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/file/FileHelixDataAccessor.java
@@ -0,0 +1,351 @@
+package org.apache.helix.manager.file;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.store.PropertyStore;
+import org.apache.helix.store.PropertyStoreException;
+import org.apache.helix.store.file.FilePropertyStore;
+import org.apache.log4j.Logger;
+
+
+@Deprecated
+public class FileHelixDataAccessor implements HelixDataAccessor
+{
+  private static Logger LOG = Logger.getLogger(FileHelixDataAccessor.class);
+
+  private final FilePropertyStore<ZNRecord> _store;
+  private final String _clusterName;
+  private final ReadWriteLock _readWriteLock = new ReentrantReadWriteLock();
+  private final Builder _propertyKeyBuilder;
+
+
+  public FileHelixDataAccessor(FilePropertyStore<ZNRecord> store,
+      String clusterName)
+  {
+    _store = store;
+    _clusterName = clusterName;
+    _propertyKeyBuilder = new PropertyKey.Builder(_clusterName);
+  }
+
+  @Override
+  public boolean createProperty(PropertyKey key, HelixProperty value)
+  {
+    return updateProperty(key, value);
+  }
+
+  @Override
+  public <T extends HelixProperty> boolean setProperty(PropertyKey key, T value)
+  {
+    String path = key.getPath();
+    try
+    {
+      _readWriteLock.writeLock().lock();
+      _store.setProperty(path, value.getRecord());
+      return true;
+    }
+    catch(PropertyStoreException e)
+    {
+      LOG.error("Fail to set cluster property clusterName: " + _clusterName +
+                " type:" + key.getType() +
+                " keys:" + Arrays.toString(key.getParams()), e);
+      return false;
+    }
+    finally
+    {
+      _readWriteLock.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public <T extends HelixProperty> boolean updateProperty(PropertyKey key,
+      T value)
+  {
+    PropertyType type = key.getType();
+    String path = key.getPath();
+
+    try
+    {
+      _readWriteLock.writeLock().lock();
+      
+      if (type.isUpdateOnlyOnExists())
+      {
+        updateIfExists(path, value.getRecord(), type.isMergeOnUpdate());
+      }
+      else
+      {
+        createOrUpdate(path, value.getRecord(), type.isMergeOnUpdate());
+      }
+      return true;
+    }
+    catch (PropertyStoreException e)
+    {
+      LOG.error("fail to update property. type:" +
+          type + ", keys:" + Arrays.toString(key.getParams()), e);
+      return false;
+    }
+    finally
+    {
+      _readWriteLock.writeLock().unlock();
+    }
+
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public <T extends HelixProperty> T getProperty(PropertyKey key)
+  {
+    String path = key.getPath();
+    try
+    {
+      _readWriteLock.readLock().lock();
+      ZNRecord record = _store.getProperty(path);
+      if (record == null)
+      {
+        return null;
+      }
+      return (T) HelixProperty.convertToTypedInstance(key.getTypeClass(), record);
+    }
+    catch(PropertyStoreException e)
+    {
+      LOG.error("Fail to get property. clusterName: " + _clusterName +
+                " type:" + key.getType() +
+                " keys:" + Arrays.toString(key.getParams()), e);
+      return null;
+    }
+    finally
+    {
+      _readWriteLock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public boolean removeProperty(PropertyKey key)
+  {
+    String path = key.getPath();;
+
+    try
+    {
+      _readWriteLock.writeLock().lock();
+      _store.removeProperty(path);
+      return true;
+    }
+    catch (PropertyStoreException e)
+    {
+      LOG.error("Fail to remove property. type:"  +
+          key.getType() + ", keys:" + Arrays.toString(key.getParams()), e);
+      return false;
+    }
+    finally
+    {
+      _readWriteLock.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public List<String> getChildNames(PropertyKey key)
+  {
+    String path = key.getPath();;
+
+    try
+    {
+      _readWriteLock.readLock().lock();
+
+      List<String> childs = _store.getPropertyNames(path);
+      return childs;
+    }
+    catch(PropertyStoreException e)
+    {
+      LOG.error("Fail to get child names. clusterName: " + _clusterName +
+          ", parentPath:" + path, e);
+      
+      return Collections.emptyList();
+    }
+    finally
+    {
+      _readWriteLock.readLock().unlock();
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public <T extends HelixProperty> List<T> getChildValues(PropertyKey key)
+  {
+    String path = key.getPath();
+    List<T> records = new ArrayList<T>();
+    try
+    {
+      _readWriteLock.readLock().lock();
+
+      List<String> childs = _store.getPropertyNames(path);
+      if (childs == null || childs.size() == 0)
+      {
+        return Collections.emptyList();
+      }
+
+      for (String child : childs)
+      {
+        ZNRecord record = _store.getProperty(child);
+        if (record != null)
+        {
+          records.add((T) HelixProperty.convertToTypedInstance(key.getTypeClass(), record));
+        }
+      }
+    }
+    catch(PropertyStoreException e)
+    {
+      LOG.error("Fail to get child properties. clusterName:" + _clusterName +
+          ", parentPath:" + path, e);
+    }
+    finally
+    {
+      _readWriteLock.readLock().unlock();
+    }
+    
+    return records;
+  }
+
+  @Override
+  public <T extends HelixProperty> Map<String, T> getChildValuesMap(
+      PropertyKey key)
+  {
+    List<T> list = getChildValues(key);
+    return HelixProperty.convertListToMap(list);
+  }
+
+  @Override
+  public Builder keyBuilder()
+  {
+    return _propertyKeyBuilder;
+  }
+
+  @Override
+  public <T extends HelixProperty> boolean[] createChildren(
+      List<PropertyKey> keys, List<T> children)
+  {
+    boolean[] success = new boolean[keys.size()];
+    for (int i = 0; i < keys.size(); i++)
+    {
+      success[i] = createProperty(keys.get(i), children.get(i));
+    }
+    return success;
+  }
+
+  @Override
+  public <T extends HelixProperty> boolean[] setChildren(
+      List<PropertyKey> keys, List<T> children)
+  {
+    boolean[] success = new boolean[keys.size()];
+    for (int i = 0; i < keys.size(); i++)
+    {
+      success[i] = setProperty(keys.get(i), children.get(i));
+    }
+    return success;
+  }
+
+  @Override
+  public BaseDataAccessor getBaseDataAccessor()
+  {
+    throw new UnsupportedOperationException("No BaseDataAccessor for FileHelixDataAccessor");
+  }
+  
+  // HACK remove it later
+  public PropertyStore<ZNRecord> getStore()
+  {
+    return _store;
+  }
+
+  private void createOrUpdate(String path, final ZNRecord record, final boolean mergeOnUpdate)
+      throws PropertyStoreException
+  {
+    final int RETRYLIMIT = 3;
+    int retryCount = 0;
+    while (retryCount < RETRYLIMIT)
+    {
+      try
+      {
+        if (_store.exists(path))
+        {
+          DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>()
+          {
+            @Override
+            public ZNRecord update(ZNRecord currentData)
+            {
+              if(mergeOnUpdate)
+              {
+                currentData.merge(record);
+                return currentData;
+              }
+              return record;
+            }
+          };
+          _store.updatePropertyUntilSucceed(path, updater);
+
+        }
+        else
+        {
+          if(record.getDeltaList().size() > 0)
+          {
+            ZNRecord newRecord = new ZNRecord(record.getId());
+            newRecord.merge(record);
+            _store.setProperty(path, newRecord);
+          }
+          else
+          {
+            _store.setProperty(path, record);
+          }
+        }
+        break;
+      }
+      catch (Exception e)
+      {
+        retryCount = retryCount + 1;
+        LOG.warn("Exception trying to update " + path + " Exception:"
+            + e.getMessage() + ". Will retry.");
+      }
+    }
+  }
+  
+  private void updateIfExists(String path, final ZNRecord record, boolean mergeOnUpdate)
+      throws PropertyStoreException
+  {
+    if (_store.exists(path))
+    {
+      _store.setProperty(path, record);
+    }
+  }
+
+  @Override
+  public <T extends HelixProperty> boolean[] updateChildren(List<String> paths,
+      List<DataUpdater<ZNRecord>> updaters, int options)
+  {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public <T extends HelixProperty> List<T> getProperty(List<PropertyKey> keys)
+  {
+    List<T> list = new ArrayList<T>();
+    for (PropertyKey key : keys)
+    {
+      list.add((T)getProperty(key));
+    }
+    return list;
+  }
+  
+}