You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:57 UTC
[11/42] Refactoring the package names and removing jsql parser
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/josql/ClusterJosqlQueryProcessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/josql/ClusterJosqlQueryProcessor.java b/helix-core/src/main/java/org/apache/helix/josql/ClusterJosqlQueryProcessor.java
new file mode 100644
index 0000000..9a5f36d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/josql/ClusterJosqlQueryProcessor.java
@@ -0,0 +1,304 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.josql;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ConfigScope.ConfigScopeProperty;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
+import org.apache.log4j.Logger;
+import org.josql.Query;
+import org.josql.QueryExecutionException;
+import org.josql.QueryParseException;
+import org.josql.QueryResults;
+
+
+public class ClusterJosqlQueryProcessor
+{
+ public static final String PARTITIONS = "PARTITIONS";
+ public static final String FLATTABLE = ".Table";
+
+ HelixManager _manager;
+ private static Logger _logger = Logger.getLogger(ClusterJosqlQueryProcessor.class);
+
+ public ClusterJosqlQueryProcessor(HelixManager manager)
+ {
+ _manager = manager;
+ }
+
+ String parseFromTarget(String sql)
+ {
+ // We need to find out the "FROM" target, and replace it with liveInstances
+ // / partitions etc
+ int fromIndex = sql.indexOf("FROM");
+ if (fromIndex == -1)
+ {
+ throw new HelixException("Query must contain FROM target. Query: " + sql);
+ }
+ // Per JoSql, select FROM <target> the target must be a object class that
+ // corresponds to a "table row"
+ // In out case, the row is always a ZNRecord
+
+ int nextSpace = sql.indexOf(" ", fromIndex);
+ while (sql.charAt(nextSpace) == ' ')
+ {
+ nextSpace++;
+ }
+ int nextnextSpace = sql.indexOf(" ", nextSpace);
+ if (nextnextSpace == -1)
+ {
+ nextnextSpace = sql.length();
+ }
+ String fromTarget = sql.substring(nextSpace, nextnextSpace).trim();
+
+ if (fromTarget.length() == 0)
+ {
+ throw new HelixException("FROM target in the query cannot be empty. Query: " + sql);
+ }
+ return fromTarget;
+ }
+
+ public List<Object> runJoSqlQuery(String josql, Map<String, Object> bindVariables,
+ List<Object> additionalFunctionHandlers, List queryTarget) throws QueryParseException,
+ QueryExecutionException
+ {
+ Query josqlQuery = prepareQuery(bindVariables, additionalFunctionHandlers);
+
+ josqlQuery.parse(josql);
+ QueryResults qr = josqlQuery.execute(queryTarget);
+
+ return qr.getResults();
+ }
+
+ Query prepareQuery(Map<String, Object> bindVariables, List<Object> additionalFunctionHandlers)
+ {
+ // DataAccessor accessor = _manager.getDataAccessor();
+ HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+
+ // Get all the ZNRecords in the cluster and set them as bind variables
+ Builder keyBuilder = accessor.keyBuilder();
+// List<ZNRecord> instanceConfigs = accessor.getChildValues(PropertyType.CONFIGS,
+// ConfigScopeProperty.PARTICIPANT.toString());
+
+ List<ZNRecord> instanceConfigs = HelixProperty.convertToList(accessor.getChildValues(keyBuilder.instanceConfigs()));
+
+ List<ZNRecord> liveInstances = HelixProperty.convertToList(accessor.getChildValues(keyBuilder.liveInstances()));
+ List<ZNRecord> stateModelDefs = HelixProperty.convertToList(accessor.getChildValues(keyBuilder.stateModelDefs()));
+
+ // Idealstates are stored in a map from resource name to idealState ZNRecord
+ List<ZNRecord> idealStateList = HelixProperty.convertToList(accessor.getChildValues(keyBuilder.idealStates()));
+
+ Map<String, ZNRecord> idealStatesMap = new HashMap<String, ZNRecord>();
+ for (ZNRecord idealState : idealStateList)
+ {
+ idealStatesMap.put(idealState.getId(), idealState);
+ }
+ // Make up the partition list: for selecting partitions
+ List<ZNRecord> partitions = new ArrayList<ZNRecord>();
+ for (ZNRecord idealState : idealStateList)
+ {
+ for (String partitionName : idealState.getMapFields().keySet())
+ {
+ partitions.add(new ZNRecord(partitionName));
+ }
+ }
+
+ List<ZNRecord> externalViewList = HelixProperty.convertToList(accessor.getChildValues(keyBuilder.externalViews()));
+ // ExternalViews are stored in a map from resource name to idealState
+ // ZNRecord
+ Map<String, ZNRecord> externalViewMap = new HashMap<String, ZNRecord>();
+ for (ZNRecord externalView : externalViewList)
+ {
+ externalViewMap.put(externalView.getId(), externalView);
+ }
+ // Map from instance name to a map from resource to current state ZNRecord
+ Map<String, Map<String, ZNRecord>> currentStatesMap = new HashMap<String, Map<String, ZNRecord>>();
+ // Map from instance name to a list of combined flat ZNRecordRow
+ Map<String, List<ZNRecordRow>> flatCurrentStateMap = new HashMap<String, List<ZNRecordRow>>();
+
+ for (ZNRecord instance : liveInstances)
+ {
+ String host = instance.getId();
+ String sessionId = instance.getSimpleField(LiveInstanceProperty.SESSION_ID.toString());
+ Map<String, ZNRecord> currentStates = new HashMap<String, ZNRecord>();
+ List<ZNRecord> instanceCurrentStateList = new ArrayList<ZNRecord>();
+ for (ZNRecord idealState : idealStateList)
+ {
+ String resourceName = idealState.getId();
+
+ HelixProperty property = accessor.getProperty(keyBuilder.currentState(host, sessionId, resourceName));
+ ZNRecord currentState =null;
+ if (property == null)
+ {
+ _logger.warn("Resource " + resourceName + " has null currentState");
+ currentState = new ZNRecord(resourceName);
+ }else{
+ currentState = property.getRecord();
+ }
+ currentStates.put(resourceName, currentState);
+ instanceCurrentStateList.add(currentState);
+ }
+ currentStatesMap.put(host, currentStates);
+ flatCurrentStateMap.put(host, ZNRecordRow.flatten(instanceCurrentStateList));
+ }
+ Query josqlQuery = new Query();
+
+ // Set the default bind variables
+ josqlQuery
+.setVariable(
+ PropertyType.CONFIGS.toString() + "/" + ConfigScopeProperty.PARTICIPANT.toString(),
+ instanceConfigs);
+ josqlQuery.setVariable(PropertyType.IDEALSTATES.toString(), idealStatesMap);
+ josqlQuery.setVariable(PropertyType.LIVEINSTANCES.toString(), liveInstances);
+ josqlQuery.setVariable(PropertyType.STATEMODELDEFS.toString(), stateModelDefs);
+ josqlQuery.setVariable(PropertyType.EXTERNALVIEW.toString(), externalViewMap);
+ josqlQuery.setVariable(PropertyType.CURRENTSTATES.toString(), currentStatesMap);
+ josqlQuery.setVariable(PARTITIONS, partitions);
+
+ // Flat version of ZNRecords
+ josqlQuery.setVariable(
+ PropertyType.CONFIGS.toString() + "/" + ConfigScopeProperty.PARTICIPANT.toString()
+ + FLATTABLE,
+ ZNRecordRow.flatten(instanceConfigs));
+ josqlQuery.setVariable(PropertyType.IDEALSTATES.toString() + FLATTABLE,
+ ZNRecordRow.flatten(idealStateList));
+ josqlQuery.setVariable(PropertyType.LIVEINSTANCES.toString() + FLATTABLE,
+ ZNRecordRow.flatten(liveInstances));
+ josqlQuery.setVariable(PropertyType.STATEMODELDEFS.toString() + FLATTABLE,
+ ZNRecordRow.flatten(stateModelDefs));
+ josqlQuery.setVariable(PropertyType.EXTERNALVIEW.toString() + FLATTABLE,
+ ZNRecordRow.flatten(externalViewList));
+ josqlQuery.setVariable(PropertyType.CURRENTSTATES.toString() + FLATTABLE,
+ flatCurrentStateMap.values());
+ josqlQuery.setVariable(PARTITIONS + FLATTABLE, ZNRecordRow.flatten(partitions));
+ // Set additional bind variables
+ if (bindVariables != null)
+ {
+ for (String key : bindVariables.keySet())
+ {
+ josqlQuery.setVariable(key, bindVariables.get(key));
+ }
+ }
+
+ josqlQuery.addFunctionHandler(new ZNRecordJosqlFunctionHandler());
+ josqlQuery.addFunctionHandler(new ZNRecordRow());
+ josqlQuery.addFunctionHandler(new Integer(0));
+ if (additionalFunctionHandlers != null)
+ {
+ for (Object functionHandler : additionalFunctionHandlers)
+ {
+ josqlQuery.addFunctionHandler(functionHandler);
+ }
+ }
+ return josqlQuery;
+ }
+
+ public List<Object> runJoSqlQuery(String josql, Map<String, Object> bindVariables,
+ List<Object> additionalFunctionHandlers) throws QueryParseException, QueryExecutionException
+ {
+ Query josqlQuery = prepareQuery(bindVariables, additionalFunctionHandlers);
+
+ // Per JoSql, select FROM <target> the target must be a object class that
+ // corresponds to a "table row",
+ // while the table (list of Objects) are put in the query by
+ // query.execute(List<Object>). In the input,
+ // In out case, the row is always a ZNRecord. But in SQL, the from target is
+ // a "table name".
+
+ String fromTargetString = parseFromTarget(josql);
+
+ List fromTargetList = null;
+ Object fromTarget = null;
+ if (fromTargetString.equalsIgnoreCase(PARTITIONS))
+ {
+ fromTarget = josqlQuery.getVariable(PARTITIONS.toString());
+ } else if (fromTargetString.equalsIgnoreCase(PropertyType.LIVEINSTANCES.toString()))
+ {
+ fromTarget = josqlQuery.getVariable(PropertyType.LIVEINSTANCES.toString());
+ } else if (fromTargetString.equalsIgnoreCase(PropertyType.CONFIGS.toString() + "/"
+ + ConfigScopeProperty.PARTICIPANT.toString()))
+ {
+ fromTarget = josqlQuery.getVariable(PropertyType.CONFIGS.toString() + "/"
+ + ConfigScopeProperty.PARTICIPANT.toString());
+ } else if (fromTargetString.equalsIgnoreCase(PropertyType.STATEMODELDEFS.toString()))
+ {
+ fromTarget = josqlQuery.getVariable(PropertyType.STATEMODELDEFS.toString());
+ } else if (fromTargetString.equalsIgnoreCase(PropertyType.EXTERNALVIEW.toString()))
+ {
+ fromTarget = josqlQuery.getVariable(PropertyType.EXTERNALVIEW.toString());
+ }
+ else if (fromTargetString.equalsIgnoreCase(PropertyType.IDEALSTATES.toString()))
+ {
+ fromTarget = josqlQuery.getVariable(PropertyType.IDEALSTATES.toString());
+ }
+
+ else if (fromTargetString.equalsIgnoreCase(PARTITIONS + FLATTABLE))
+ {
+ fromTarget = josqlQuery.getVariable(PARTITIONS.toString() + FLATTABLE);
+ } else if (fromTargetString.equalsIgnoreCase(PropertyType.LIVEINSTANCES.toString() + FLATTABLE))
+ {
+ fromTarget = josqlQuery.getVariable(PropertyType.LIVEINSTANCES.toString() + FLATTABLE);
+ } else if (fromTargetString.equalsIgnoreCase(PropertyType.CONFIGS.toString() + "/"
+ + ConfigScopeProperty.PARTICIPANT.toString()
+ + FLATTABLE))
+ {
+ fromTarget = josqlQuery.getVariable(PropertyType.CONFIGS.toString() + "/"
+ + ConfigScopeProperty.PARTICIPANT.toString() + FLATTABLE);
+ } else if (fromTargetString
+ .equalsIgnoreCase(PropertyType.STATEMODELDEFS.toString() + FLATTABLE))
+ {
+ fromTarget = josqlQuery.getVariable(PropertyType.STATEMODELDEFS.toString() + FLATTABLE);
+ } else if (fromTargetString.equalsIgnoreCase(PropertyType.EXTERNALVIEW.toString() + FLATTABLE))
+ {
+ fromTarget = josqlQuery.getVariable(PropertyType.EXTERNALVIEW.toString() + FLATTABLE);
+ }
+ else if (fromTargetString.equalsIgnoreCase(PropertyType.IDEALSTATES.toString() + FLATTABLE))
+ {
+ fromTarget = josqlQuery.getVariable(PropertyType.IDEALSTATES.toString() + FLATTABLE);
+ }
+ else
+ {
+ throw new HelixException(
+ "Unknown query target "
+ + fromTargetString
+ + ". Target should be PARTITIONS, LIVEINSTANCES, CONFIGS, STATEMODELDEFS, IDEALSTATES, EXTERNALVIEW, and corresponding flat Tables");
+ }
+
+ fromTargetList = fromTargetString.endsWith(FLATTABLE) ? ((List<ZNRecordRow>) fromTarget)
+ : ((List<ZNRecord>) fromTarget);
+
+ // Per JoSql, select FROM <target> the target must be a object class that
+ // corresponds to a "table row"
+ // In out case, the row is always a ZNRecord
+ josql = josql.replaceFirst(
+ fromTargetString,
+ fromTargetString.endsWith(FLATTABLE) ? ZNRecordRow.class.getName() : ZNRecord.class
+ .getName());
+ josqlQuery.parse(josql);
+ QueryResults qr = josqlQuery.execute(fromTargetList);
+ return qr.getResults();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/josql/ZNRecordJosqlFunctionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/josql/ZNRecordJosqlFunctionHandler.java b/helix-core/src/main/java/org/apache/helix/josql/ZNRecordJosqlFunctionHandler.java
new file mode 100644
index 0000000..34c5c75
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/josql/ZNRecordJosqlFunctionHandler.java
@@ -0,0 +1,90 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.josql;
+
+import java.util.Map;
+
+import org.apache.helix.ZNRecord;
+import org.josql.functions.AbstractFunctionHandler;
+
+
+
+public class ZNRecordJosqlFunctionHandler extends AbstractFunctionHandler
+{
+ public boolean hasSimpleField(ZNRecord record, String fieldName, String field)
+ {
+ if(!record.getSimpleFields().containsKey(fieldName))
+ {
+ return false;
+ }
+ return field.equals(record.getSimpleField(fieldName));
+ }
+
+ public boolean hasListField(ZNRecord record, String fieldName, String field)
+ {
+ if(!record.getListFields().containsKey(fieldName))
+ {
+ return false;
+ }
+ return record.getListField(fieldName).contains(field);
+ }
+
+ public boolean hasMapFieldValue(ZNRecord record, String fieldName, String mapKey, String mapValue)
+ {
+ if(!record.getMapFields().containsKey(fieldName))
+ {
+ return false;
+ }
+ if(record.getMapField(fieldName).containsKey(mapKey))
+ {
+ return record.getMapField(fieldName).get(mapKey).equals(mapValue);
+ }
+ return false;
+ }
+
+ public boolean hasMapFieldKey(ZNRecord record, String fieldName, String mapKey)
+ {
+ if(!record.getMapFields().containsKey(fieldName))
+ {
+ return false;
+ }
+ return record.getMapField(fieldName).containsKey(mapKey);
+ }
+
+ public String getMapFieldValue(ZNRecord record, String fieldName, String mapKey)
+ {
+ if(record.getMapFields().containsKey(fieldName))
+ {
+ return record.getMapField(fieldName).get(mapKey);
+ }
+ return null;
+ }
+
+ public String getSimpleFieldValue(ZNRecord record, String key)
+ {
+ return record.getSimpleField(key);
+ }
+
+ public ZNRecord getZNRecordFromMap(Map<String, ZNRecord> recordMap, String key)
+ {
+ return recordMap.get(key);
+ }
+
+ public ZNRecord getZNRecordFromMap(Map<String, Map<String, ZNRecord>> recordMap, String key, String subKey)
+ {
+ return recordMap.get(key).get(subKey);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/josql/ZNRecordRow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/josql/ZNRecordRow.java b/helix-core/src/main/java/org/apache/helix/josql/ZNRecordRow.java
new file mode 100644
index 0000000..462b690
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/josql/ZNRecordRow.java
@@ -0,0 +1,195 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.josql;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ZNRecord;
+
+
+/**
+ * A Normalized form of ZNRecord
+ * */
+public class ZNRecordRow
+{
+ // "Field names" in the flattened ZNRecord
+ public static final String SIMPLE_KEY = "simpleKey";
+ public static final String SIMPLE_VALUE = "simpleValue";
+
+ public static final String LIST_KEY = "listKey";
+ public static final String LIST_VALUE = "listValue";
+ public static final String LIST_VALUE_INDEX = "listValueIndex";
+
+ public static final String MAP_KEY = "mapKey";
+ public static final String MAP_SUBKEY = "mapSubKey";
+ public static final String MAP_VALUE = "mapValue";
+ public static final String ZNRECORD_ID = "recordId";
+ // ZNRECORD path ?
+
+
+ final Map<String, String> _rowDataMap = new HashMap<String, String>();
+
+ public ZNRecordRow()
+ {
+ _rowDataMap.put(SIMPLE_KEY, "");
+ _rowDataMap.put(SIMPLE_VALUE, "");
+ _rowDataMap.put(LIST_KEY, "");
+ _rowDataMap.put(LIST_VALUE, "");
+ _rowDataMap.put(LIST_VALUE_INDEX, "");
+ _rowDataMap.put(MAP_KEY, "");
+ _rowDataMap.put(MAP_SUBKEY, "");
+ _rowDataMap.put(MAP_VALUE, "");
+ _rowDataMap.put(ZNRECORD_ID, "");
+ }
+
+ public String getField(String rowField)
+ {
+ return _rowDataMap.get(rowField);
+ }
+
+ public void putField(String fieldName, String fieldValue)
+ {
+ _rowDataMap.put(fieldName, fieldValue);
+ }
+ public String getListValueIndex()
+ {
+ return getField(LIST_VALUE_INDEX);
+ }
+ public String getSimpleKey()
+ {
+ return getField(SIMPLE_KEY);
+ }
+
+ public String getSimpleValue()
+ {
+ return getField(SIMPLE_VALUE);
+ }
+
+ public String getListKey()
+ {
+ return getField(LIST_KEY);
+ }
+
+ public String getListValue()
+ {
+ return getField(LIST_VALUE);
+ }
+
+ public String getMapKey()
+ {
+ return getField(MAP_KEY);
+ }
+
+ public String getMapSubKey()
+ {
+ return getField(MAP_SUBKEY);
+ }
+
+ public String getMapValue()
+ {
+ return getField(MAP_VALUE);
+ }
+
+ public String getRecordId()
+ {
+ return getField(ZNRECORD_ID);
+ }
+
+ /* Josql function handlers */
+ public static String getField(ZNRecordRow row, String rowField)
+ {
+ return row.getField(rowField);
+ }
+
+ public static List<ZNRecordRow> convertSimpleFields(ZNRecord record)
+ {
+ List<ZNRecordRow> result = new ArrayList<ZNRecordRow>();
+ for(String key : record.getSimpleFields().keySet())
+ {
+ ZNRecordRow row = new ZNRecordRow();
+ row.putField(ZNRECORD_ID, record.getId());
+ row.putField(SIMPLE_KEY, key);
+ row.putField(SIMPLE_VALUE, record.getSimpleField(key));
+ result.add(row);
+ }
+ return result;
+ }
+
+ public static List<ZNRecordRow> convertListFields(ZNRecord record)
+ {
+ List<ZNRecordRow> result = new ArrayList<ZNRecordRow>();
+ for(String key : record.getListFields().keySet())
+ {
+ int order = 0;
+ for(String value : record.getListField(key))
+ {
+ ZNRecordRow row = new ZNRecordRow();
+ row.putField(ZNRECORD_ID, record.getId());
+ row.putField(LIST_KEY, key);
+ row.putField(LIST_VALUE, record.getSimpleField(key));
+ row.putField(LIST_VALUE_INDEX, ""+order);
+ order++;
+ result.add(row);
+ }
+ }
+ return result;
+ }
+
+ public static List<ZNRecordRow> convertMapFields(ZNRecord record)
+ {
+ List<ZNRecordRow> result = new ArrayList<ZNRecordRow>();
+ for(String key0 : record.getMapFields().keySet())
+ {
+ for(String key1 : record.getMapField(key0).keySet())
+ {
+ ZNRecordRow row = new ZNRecordRow();
+ row.putField(ZNRECORD_ID, record.getId());
+ row.putField(MAP_KEY, key0);
+ row.putField(MAP_SUBKEY, key1);
+ row.putField(MAP_VALUE, record.getMapField(key0).get(key1));
+ result.add(row);
+ }
+ }
+ return result;
+ }
+
+ public static List<ZNRecordRow> flatten(ZNRecord record)
+ {
+ List<ZNRecordRow> result = convertMapFields(record);
+ result.addAll(convertListFields(record));
+ result.addAll(convertSimpleFields(record));
+ return result;
+ }
+
+ public static List<ZNRecordRow> flatten(Collection<ZNRecord> recordList)
+ {
+ List<ZNRecordRow> result = new ArrayList<ZNRecordRow>();
+ for(ZNRecord record : recordList)
+ {
+ result.addAll(flatten(record));
+ }
+ return result;
+ }
+
+ public static List<ZNRecordRow> getRowListFromMap(Map<String, List<ZNRecordRow>> rowMap, String key)
+ {
+ return rowMap.get(key);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/josql/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/josql/package-info.java b/helix-core/src/main/java/org/apache/helix/josql/package-info.java
new file mode 100644
index 0000000..d0e7c52
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/josql/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Jsql processor for Helix
+ *
+ */
+package org.apache.helix.josql;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/file/DynamicFileHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/file/DynamicFileHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/file/DynamicFileHelixManager.java
new file mode 100644
index 0000000..5773841
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/file/DynamicFileHelixManager.java
@@ -0,0 +1,460 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.file;
+
+import static org.apache.helix.HelixConstants.ChangeType.CURRENT_STATE;
+import static org.apache.helix.HelixConstants.ChangeType.IDEAL_STATE;
+import static org.apache.helix.HelixConstants.ChangeType.LIVE_INSTANCE;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigChangeListener;
+import org.apache.helix.ControllerChangeListener;
+import org.apache.helix.CurrentStateChangeListener;
+import org.apache.helix.DataAccessor;
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.HealthStateChangeListener;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.IdealStateChangeListener;
+import org.apache.helix.InstanceType;
+import org.apache.helix.LiveInstanceChangeListener;
+import org.apache.helix.MessageListener;
+import org.apache.helix.PreConnectCallback;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ConfigScope.ConfigScopeProperty;
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
+import org.apache.helix.messaging.DefaultMessagingService;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.participant.HelixStateMachineEngine;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.store.PropertyJsonComparator;
+import org.apache.helix.store.PropertyJsonSerializer;
+import org.apache.helix.store.PropertyStore;
+import org.apache.helix.store.file.FilePropertyStore;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.tools.PropertiesReader;
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.Watcher.Event.EventType;
+
+
+@Deprecated
+public class DynamicFileHelixManager implements HelixManager
+{
+ private static final Logger LOG = Logger.getLogger(StaticFileHelixManager.class.getName());
+ private final FileDataAccessor _fileDataAccessor;
+ private final FileHelixDataAccessor _accessor;
+
+ private final String _clusterName;
+ private final InstanceType _instanceType;
+ private final String _instanceName;
+ private boolean _isConnected;
+ private final List<FileCallbackHandler> _handlers;
+ private final FileHelixAdmin _mgmtTool;
+
+ private final String _sessionId; // = "12345";
+ public static final String configFile = "configFile";
+ private final DefaultMessagingService _messagingService;
+ private final FilePropertyStore<ZNRecord> _store;
+ private final String _version;
+ private final StateMachineEngine _stateMachEngine;
+ private PropertyStore<ZNRecord> _propertyStore = null;
+
+ public DynamicFileHelixManager(String clusterName, String instanceName,
+ InstanceType instanceType, FilePropertyStore<ZNRecord> store)
+ {
+ _clusterName = clusterName;
+ _instanceName = instanceName;
+ _instanceType = instanceType;
+
+ _handlers = new ArrayList<FileCallbackHandler>();
+
+ _store = store;
+ _fileDataAccessor = new FileDataAccessor(_store, clusterName);
+ _accessor = new FileHelixDataAccessor(_store, clusterName);
+
+ _mgmtTool = new FileHelixAdmin(_store);
+ _messagingService = new DefaultMessagingService(this);
+ _sessionId = UUID.randomUUID().toString();
+ if (instanceType == InstanceType.PARTICIPANT)
+ {
+ addLiveInstance();
+ addMessageListener(_messagingService.getExecutor(), _instanceName);
+ }
+
+ _version = new PropertiesReader("cluster-manager-version.properties")
+ .getProperty("clustermanager.version");
+
+ _stateMachEngine = new HelixStateMachineEngine(this);
+
+ _messagingService.registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
+ _stateMachEngine);
+ }
+
+ @Override
+ public void disconnect()
+ {
+ _store.stop();
+ _messagingService.getExecutor().shutDown();
+
+ _isConnected = false;
+ }
+
+ @Override
+ public void addIdealStateChangeListener(IdealStateChangeListener listener)
+ {
+ final String path = HelixUtil.getIdealStatePath(_clusterName);
+
+ FileCallbackHandler callbackHandler = createCallBackHandler(path, listener, new EventType[] {
+ EventType.NodeDataChanged, EventType.NodeDeleted, EventType.NodeCreated }, IDEAL_STATE);
+ _handlers.add(callbackHandler);
+
+ }
+
+ @Override
+ public void addLiveInstanceChangeListener(LiveInstanceChangeListener listener)
+ {
+ final String path = HelixUtil.getLiveInstancesPath(_clusterName);
+ FileCallbackHandler callbackHandler = createCallBackHandler(path, listener, new EventType[] {
+ EventType.NodeDataChanged, EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated },
+ LIVE_INSTANCE);
+ _handlers.add(callbackHandler);
+ }
+
+ @Override
+ public void addConfigChangeListener(ConfigChangeListener listener)
+ {
+ throw new UnsupportedOperationException(
+ "addConfigChangeListener() is NOT supported by File Based cluster manager");
+ }
+
+ @Override
+ public void addMessageListener(MessageListener listener, String instanceName)
+ {
+ final String path = HelixUtil.getMessagePath(_clusterName, instanceName);
+
+ FileCallbackHandler callbackHandler = createCallBackHandler(path, listener, new EventType[] {
+ EventType.NodeDataChanged, EventType.NodeDeleted, EventType.NodeCreated },
+ ChangeType.MESSAGE);
+ _handlers.add(callbackHandler);
+
+ }
+
+ @Override
+ public void addCurrentStateChangeListener(CurrentStateChangeListener listener,
+ String instanceName, String sessionId)
+ {
+ final String path = HelixUtil.getCurrentStateBasePath(_clusterName, instanceName) + "/"
+ + sessionId;
+
+ FileCallbackHandler callbackHandler = createCallBackHandler(path, listener, new EventType[] {
+ EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated },
+ CURRENT_STATE);
+
+ _handlers.add(callbackHandler);
+ }
+
+ @Override
+ public void addExternalViewChangeListener(ExternalViewChangeListener listener)
+ {
+ throw new UnsupportedOperationException(
+ "addExternalViewChangeListener() is NOT supported by File Based cluster manager");
+ }
+
+ @Override
+ public DataAccessor getDataAccessor()
+ {
+ return _fileDataAccessor;
+ }
+
+ @Override
+ public String getClusterName()
+ {
+ return _clusterName;
+ }
+
+ @Override
+ public String getInstanceName()
+ {
+ return _instanceName;
+ }
+
+ @Override
+ public void connect()
+ {
+ if (!isClusterSetup(_clusterName))
+ {
+ throw new HelixException("Initial cluster structure is not set up for cluster:"
+ + _clusterName);
+ }
+ _messagingService.onConnected();
+ _store.start();
+ _isConnected = true;
+ }
+
+ @Override
+ public String getSessionId()
+ {
+ return _sessionId;
+ }
+
+ @Override
+ public boolean isConnected()
+ {
+ return _isConnected;
+ }
+
+ private boolean isClusterSetup(String clusterName)
+ {
+ if (clusterName == null || _store == null)
+ {
+ return false;
+ }
+
+ boolean isValid = _store.exists(PropertyPathConfig.getPath(PropertyType.IDEALSTATES,
+ clusterName))
+ && _store.exists(PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
+ ConfigScopeProperty.CLUSTER.toString()))
+ && _store.exists(PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
+ ConfigScopeProperty.PARTICIPANT.toString()))
+ && _store.exists(PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
+ ConfigScopeProperty.RESOURCE.toString()))
+ && _store.exists(PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, clusterName))
+ && _store.exists(PropertyPathConfig.getPath(PropertyType.LIVEINSTANCES, clusterName))
+ && _store.exists(PropertyPathConfig.getPath(PropertyType.INSTANCES, clusterName))
+ && _store.exists(PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName))
+ && _store.exists(PropertyPathConfig.getPath(PropertyType.CONTROLLER, clusterName))
+ && _store.exists(PropertyPathConfig.getPath(PropertyType.STATEMODELDEFS, clusterName))
+ && _store.exists(PropertyPathConfig.getPath(PropertyType.MESSAGES_CONTROLLER, clusterName))
+ && _store.exists(PropertyPathConfig.getPath(PropertyType.ERRORS_CONTROLLER, clusterName))
+ && _store.exists(PropertyPathConfig.getPath(PropertyType.STATUSUPDATES_CONTROLLER,
+ clusterName))
+ && _store.exists(PropertyPathConfig.getPath(PropertyType.HISTORY, clusterName));
+
+ return isValid;
+ }
+
+ private boolean isInstanceSetup()
+ {
+ if (_instanceType == InstanceType.PARTICIPANT
+ || _instanceType == InstanceType.CONTROLLER_PARTICIPANT)
+ {
+ boolean isValid = _store.exists(PropertyPathConfig.getPath(PropertyType.CONFIGS,
+ _clusterName, ConfigScopeProperty.PARTICIPANT.toString(), _instanceName))
+ && _store.exists(PropertyPathConfig.getPath(PropertyType.MESSAGES, _clusterName,
+ _instanceName))
+ && _store.exists(PropertyPathConfig.getPath(PropertyType.CURRENTSTATES, _clusterName,
+ _instanceName))
+ && _store.exists(PropertyPathConfig.getPath(PropertyType.STATUSUPDATES, _clusterName,
+ _instanceName))
+ && _store.exists(PropertyPathConfig.getPath(PropertyType.ERRORS, _clusterName,
+ _instanceName));
+
+ return isValid;
+ }
+ return true;
+ }
+
+ private void addLiveInstance()
+ {
+ if (!isClusterSetup(_clusterName))
+ {
+ throw new HelixException("Initial cluster structure is not set up for cluster:"
+ + _clusterName);
+ }
+
+ if (!isInstanceSetup())
+ {
+ throw new HelixException("Instance is not configured for instance:" + _instanceName
+ + " instanceType:" + _instanceType);
+ }
+
+ LiveInstance liveInstance = new LiveInstance(_instanceName);
+ liveInstance.setSessionId(_sessionId);
+// _fileDataAccessor.setProperty(PropertyType.LIVEINSTANCES, liveInstance.getRecord(),
+// _instanceName);
+
+ Builder keyBuilder = _accessor.keyBuilder();
+ _accessor.setProperty(keyBuilder.liveInstance(_instanceName), liveInstance);
+
+ }
+
+ @Override
+ public long getLastNotificationTime()
+ {
+ return 0;
+ }
+
+ @Override
+ public void addControllerListener(ControllerChangeListener listener)
+ {
+ throw new UnsupportedOperationException(
+ "addControllerListener() is NOT supported by File Based cluster manager");
+ }
+
+ @Override
+ public boolean removeListener(Object listener)
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ private FileCallbackHandler createCallBackHandler(String path, Object listener,
+ EventType[] eventTypes, ChangeType changeType)
+ {
+ if (listener == null)
+ {
+ throw new HelixException("Listener cannot be null");
+ }
+ return new FileCallbackHandler(this, path, listener, eventTypes, changeType);
+ }
+
+ @Override
+ public HelixAdmin getClusterManagmentTool()
+ {
+ return _mgmtTool;
+ }
+
+ private void checkConnected()
+ {
+ if (!isConnected())
+ {
+ throw new HelixException("ClusterManager not connected. Call clusterManager.connect()");
+ }
+ }
+
+ @Override
+ public PropertyStore<ZNRecord> getPropertyStore()
+ {
+ checkConnected();
+
+ if (_propertyStore == null)
+ {
+ String path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, _clusterName);
+
+ String propertyStoreRoot = _store.getPropertyRootNamespace() + path;
+ _propertyStore =
+ new FilePropertyStore<ZNRecord>(new PropertyJsonSerializer<ZNRecord>(ZNRecord.class),
+ propertyStoreRoot,
+ new PropertyJsonComparator<ZNRecord>(ZNRecord.class));
+ }
+ return _propertyStore;
+ }
+
+ @Override
+ public ClusterMessagingService getMessagingService()
+ {
+ return _messagingService;
+ }
+
+ @Override
+ public ParticipantHealthReportCollector getHealthReportCollector()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public InstanceType getInstanceType()
+ {
+ return _instanceType;
+ }
+
+ @Override
+ public void addHealthStateChangeListener(HealthStateChangeListener listener, String instanceName)
+ throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public String getVersion()
+ {
+ return _version;
+ }
+
+ @Override
+ public StateMachineEngine getStateMachineEngine()
+ {
+ return _stateMachEngine;
+ }
+
+ @Override
+ public boolean isLeader()
+ {
+ if (_instanceType != InstanceType.CONTROLLER)
+ {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public ConfigAccessor getConfigAccessor()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void startTimerTasks()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void stopTimerTasks()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public HelixDataAccessor getHelixDataAccessor()
+ {
+ return _accessor;
+ }
+
+ @Override
+ public void addPreConnectCallback(PreConnectCallback callback)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public ZkHelixPropertyStore<ZNRecord> getHelixPropertyStore()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/file/FileCallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/file/FileCallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/file/FileCallbackHandler.java
new file mode 100644
index 0000000..12814a2
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/file/FileCallbackHandler.java
@@ -0,0 +1,293 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.file;
+
+import static org.apache.helix.HelixConstants.ChangeType.CONFIG;
+import static org.apache.helix.HelixConstants.ChangeType.CURRENT_STATE;
+import static org.apache.helix.HelixConstants.ChangeType.EXTERNAL_VIEW;
+import static org.apache.helix.HelixConstants.ChangeType.IDEAL_STATE;
+import static org.apache.helix.HelixConstants.ChangeType.LIVE_INSTANCE;
+import static org.apache.helix.HelixConstants.ChangeType.MESSAGE;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.helix.ConfigChangeListener;
+import org.apache.helix.ControllerChangeListener;
+import org.apache.helix.CurrentStateChangeListener;
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.HelixManager;
+import org.apache.helix.IdealStateChangeListener;
+import org.apache.helix.LiveInstanceChangeListener;
+import org.apache.helix.MessageListener;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.store.PropertyChangeListener;
+import org.apache.helix.store.PropertyStoreException;
+import org.apache.helix.store.file.FilePropertyStore;
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.Watcher.Event.EventType;
+
+
+// TODO remove code duplication: CallbackHandler and CallbackHandlerForFile
+@Deprecated
+public class FileCallbackHandler implements PropertyChangeListener<ZNRecord>
+{
+
+ private static Logger LOG = Logger.getLogger(FileCallbackHandler.class);
+
+ private final String _path;
+ private final Object _listener;
+ private final EventType[] _eventTypes;
+ private final ChangeType _changeType;
+// private final FileDataAccessor _accessor;
+ private final FileHelixDataAccessor _accessor;
+ private final AtomicLong lastNotificationTimeStamp;
+ private final HelixManager _manager;
+ private final FilePropertyStore<ZNRecord> _store;
+
+ public FileCallbackHandler(HelixManager manager, String path, Object listener,
+ EventType[] eventTypes, ChangeType changeType)
+ {
+ this._manager = manager;
+ this._accessor = (FileHelixDataAccessor) manager.getHelixDataAccessor();
+ this._path = path;
+ this._listener = listener;
+ this._eventTypes = eventTypes;
+ this._changeType = changeType;
+ this._store = (FilePropertyStore<ZNRecord>) _accessor.getStore();
+ lastNotificationTimeStamp = new AtomicLong(System.nanoTime());
+
+ init();
+ }
+
+ public Object getListener()
+ {
+ return _listener;
+ }
+
+ public Object getPath()
+ {
+ return _path;
+ }
+
+ public void invoke(NotificationContext changeContext) throws Exception
+ {
+ // This allows the listener to work with one change at a time
+ synchronized (_listener)
+ {
+ if (LOG.isDebugEnabled())
+ {
+ LOG.debug(Thread.currentThread().getId() + " START:INVOKE "
+ + changeContext.getPathChanged() + " listener:"
+ + _listener.getClass().getCanonicalName());
+ }
+
+ Builder keyBuilder = _accessor.keyBuilder();
+
+ if (_changeType == IDEAL_STATE)
+ {
+ // System.err.println("ideal state change");
+ IdealStateChangeListener idealStateChangeListener = (IdealStateChangeListener) _listener;
+ subscribeForChanges(changeContext, true, true);
+ List<IdealState> idealStates = _accessor.getChildValues(keyBuilder.idealStates());
+ idealStateChangeListener.onIdealStateChange(idealStates, changeContext);
+
+ } else if (_changeType == CONFIG)
+ {
+
+ ConfigChangeListener configChangeListener = (ConfigChangeListener) _listener;
+ subscribeForChanges(changeContext, true, true);
+ List<InstanceConfig> configs = _accessor.getChildValues(keyBuilder.instanceConfigs());
+ configChangeListener.onConfigChange(configs, changeContext);
+
+ } else if (_changeType == LIVE_INSTANCE)
+ {
+ LiveInstanceChangeListener liveInstanceChangeListener = (LiveInstanceChangeListener) _listener;
+ subscribeForChanges(changeContext, true, false);
+ List<LiveInstance> liveInstances = _accessor.getChildValues(keyBuilder.liveInstances());
+ liveInstanceChangeListener.onLiveInstanceChange(liveInstances, changeContext);
+
+ } else if (_changeType == CURRENT_STATE)
+ {
+ CurrentStateChangeListener currentStateChangeListener;
+ currentStateChangeListener = (CurrentStateChangeListener) _listener;
+ subscribeForChanges(changeContext, true, true);
+ String instanceName = HelixUtil.getInstanceNameFromPath(_path);
+ String[] pathParts = _path.split("/");
+ List<CurrentState> currentStates = _accessor.getChildValues(keyBuilder.currentStates(instanceName, pathParts[pathParts.length - 1]));
+ currentStateChangeListener.onStateChange(instanceName, currentStates, changeContext);
+
+ } else if (_changeType == MESSAGE)
+ {
+ MessageListener messageListener = (MessageListener) _listener;
+ subscribeForChanges(changeContext, true, false);
+ String instanceName = _manager.getInstanceName();
+ List<Message> messages = _accessor.getChildValues(keyBuilder.messages(instanceName));
+ messageListener.onMessage(instanceName, messages, changeContext);
+ } else if (_changeType == EXTERNAL_VIEW)
+ {
+ ExternalViewChangeListener externalViewListener = (ExternalViewChangeListener) _listener;
+ subscribeForChanges(changeContext, true, true);
+ List<ExternalView> externalViewList = _accessor.getChildValues(keyBuilder.externalViews());
+ externalViewListener.onExternalViewChange(externalViewList, changeContext);
+ } else if (_changeType == ChangeType.CONTROLLER)
+ {
+ ControllerChangeListener controllerChangelistener = (ControllerChangeListener) _listener;
+ subscribeForChanges(changeContext, true, false);
+ controllerChangelistener.onControllerChange(changeContext);
+ }
+
+ if (LOG.isDebugEnabled())
+ {
+ LOG.debug(Thread.currentThread().getId() + " END:INVOKE " + changeContext.getPathChanged()
+ + " listener:" + _listener.getClass().getCanonicalName());
+ }
+ }
+ }
+
+ private void subscribeForChanges(NotificationContext changeContext, boolean watchParent,
+ boolean watchChild)
+ {
+ if (changeContext.getType() == NotificationContext.Type.INIT)
+ {
+ try
+ {
+ // _accessor.subscribeForPropertyChange(_path, this);
+ _store.subscribeForPropertyChange(_path, this);
+ } catch (PropertyStoreException e)
+ {
+ LOG.error("fail to subscribe for changes" + "\nexception:" + e);
+ }
+ }
+ }
+
+ public EventType[] getEventTypes()
+ {
+ return _eventTypes;
+ }
+
+ // this will invoke the listener so that it sets up the initial values from
+ // the file property store if any exists
+ public void init()
+ {
+ updateNotificationTime(System.nanoTime());
+ try
+ {
+ NotificationContext changeContext = new NotificationContext(_manager);
+ changeContext.setType(NotificationContext.Type.INIT);
+ invoke(changeContext);
+ } catch (Exception e)
+ {
+ // TODO handle exception
+ LOG.error("fail to init", e);
+ }
+ }
+
+ public void reset()
+ {
+ try
+ {
+ NotificationContext changeContext = new NotificationContext(_manager);
+ changeContext.setType(NotificationContext.Type.FINALIZE);
+ invoke(changeContext);
+ } catch (Exception e)
+ {
+ // TODO handle exception
+ LOG.error("fail to reset" + "\nexception:" + e);
+ // ZKExceptionHandler.getInstance().handle(e);
+ }
+ }
+
+ private void updateNotificationTime(long nanoTime)
+ {
+ long l = lastNotificationTimeStamp.get();
+ while (nanoTime > l)
+ {
+ boolean b = lastNotificationTimeStamp.compareAndSet(l, nanoTime);
+ if (b)
+ {
+ break;
+ } else
+ {
+ l = lastNotificationTimeStamp.get();
+ }
+ }
+ }
+
+ @Override
+ public void onPropertyChange(String key)
+ {
+ // debug
+ // LOG.error("on property change, key:" + key + ", path:" + _path);
+
+ try
+ {
+ if (needToNotify(key))
+ {
+ // debug
+ // System.err.println("notified on property change, key:" + key +
+ // ", path:" +
+ // path);
+
+ updateNotificationTime(System.nanoTime());
+ NotificationContext changeContext = new NotificationContext(_manager);
+ changeContext.setType(NotificationContext.Type.CALLBACK);
+ invoke(changeContext);
+ }
+ } catch (Exception e)
+ {
+ // TODO handle exception
+ // ZKExceptionHandler.getInstance().handle(e);
+ LOG.error("fail onPropertyChange", e);
+ }
+ }
+
+ private boolean needToNotify(String key)
+ {
+ boolean ret = false;
+ switch (_changeType)
+ {
+ // both child/data changes matter
+ case IDEAL_STATE:
+ case CURRENT_STATE:
+ case CONFIG:
+ ret = key.startsWith(_path);
+ break;
+ // only child changes matter
+ case LIVE_INSTANCE:
+ case MESSAGE:
+ case EXTERNAL_VIEW:
+ case CONTROLLER:
+ // ret = key.equals(_path);
+ ret = key.startsWith(_path);
+ break;
+ default:
+ break;
+ }
+
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/file/FileDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/file/FileDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/file/FileDataAccessor.java
new file mode 100644
index 0000000..571586c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/file/FileDataAccessor.java
@@ -0,0 +1,319 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.file;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.helix.DataAccessor;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.PropertyStore;
+import org.apache.helix.store.PropertyStoreException;
+import org.apache.helix.store.file.FilePropertyStore;
+import org.apache.log4j.Logger;
+
+
+@Deprecated
+public class FileDataAccessor implements DataAccessor
+{
+ private static Logger LOG = Logger.getLogger(FileDataAccessor.class);
+ // store that is used by FileDataAccessor
+ private final FilePropertyStore<ZNRecord> _store;
+ private final String _clusterName;
+ private final ReadWriteLock _readWriteLock = new ReentrantReadWriteLock();
+
+ public FileDataAccessor(FilePropertyStore<ZNRecord> store, String clusterName)
+ {
+ _store = store;
+ _clusterName = clusterName;
+ }
+
+ @Override
+ public boolean setProperty(PropertyType type, HelixProperty value, String... keys)
+ {
+ return setProperty(type, value.getRecord(), keys);
+ }
+
+ @Override
+ public boolean setProperty(PropertyType type, ZNRecord value, String... keys)
+ {
+ String path = PropertyPathConfig.getPath(type, _clusterName, keys);
+
+ try
+ {
+ _readWriteLock.writeLock().lock();
+ _store.setProperty(path, value);
+ return true;
+ }
+ catch(PropertyStoreException e)
+ {
+ LOG.error("Fail to set cluster property clusterName: " + _clusterName +
+ " type:" + type +
+ " keys:" + keys + "\nexception: " + e);
+ }
+ finally
+ {
+ _readWriteLock.writeLock().unlock();
+ }
+ return false;
+ }
+
+ @Override
+ public boolean updateProperty(PropertyType type, HelixProperty value, String... keys)
+ {
+ return updateProperty(type, value.getRecord(), keys);
+ }
+
+ @Override
+ public boolean updateProperty(PropertyType type, ZNRecord value, String... keys)
+ {
+ try
+ {
+ _readWriteLock.writeLock().lock();
+ String path = PropertyPathConfig.getPath(type, _clusterName, keys);
+ if (type.isUpdateOnlyOnExists())
+ {
+ updateIfExists(path, value, type.isMergeOnUpdate());
+ }
+ else
+ {
+ createOrUpdate(path, value, type.isMergeOnUpdate());
+ }
+ return true;
+ }
+ catch (PropertyStoreException e)
+ {
+ LOG.error("fail to update instance property, " +
+ " type:" + type + " keys:" + keys, e);
+ }
+ finally
+ {
+ _readWriteLock.writeLock().unlock();
+ }
+ return false;
+
+ }
+
+ @Override
+ public <T extends HelixProperty>
+ T getProperty(Class<T> clazz, PropertyType type, String... keys)
+ {
+ ZNRecord record = getProperty(type, keys);
+ if (record == null)
+ {
+ return null;
+ }
+ return HelixProperty.convertToTypedInstance(clazz, record);
+ }
+
+ @Override
+ public ZNRecord getProperty(PropertyType type, String... keys)
+ {
+ String path = PropertyPathConfig.getPath(type, _clusterName, keys);
+
+ try
+ {
+ _readWriteLock.readLock().lock();
+ return _store.getProperty(path);
+ }
+ catch(PropertyStoreException e)
+ {
+ LOG.error("Fail to get cluster property clusterName: " + _clusterName +
+ " type:" + type +
+ " keys:" + keys, e);
+ }
+ finally
+ {
+ _readWriteLock.readLock().unlock();
+ }
+ return null;
+ }
+
+ @Override
+ public boolean removeProperty(PropertyType type, String... keys)
+ {
+ String path = PropertyPathConfig.getPath(type, _clusterName, keys);
+
+ try
+ {
+ _readWriteLock.writeLock().lock();
+ _store.removeProperty(path);
+ return true;
+ }
+ catch (PropertyStoreException e)
+ {
+ LOG.error("Fail to remove instance property, " +
+ " type:" + type + " keys:" + keys, e);
+ }
+ finally
+ {
+ _readWriteLock.writeLock().unlock();
+ }
+
+ return false;
+ }
+
+ @Override
+ public List<String> getChildNames(PropertyType type, String... keys)
+ {
+ String path = PropertyPathConfig.getPath(type, _clusterName, keys);
+
+ try
+ {
+ _readWriteLock.readLock().lock();
+
+ List<String> childs = _store.getPropertyNames(path);
+ return childs;
+ }
+ catch(PropertyStoreException e)
+ {
+ LOG.error("Fail to get child names:" + _clusterName +
+ " parentPath:" + path + "\nexception: " + e);
+ }
+ finally
+ {
+ _readWriteLock.readLock().unlock();
+ }
+
+ return Collections.emptyList();
+ }
+
+ @Override
+ public <T extends HelixProperty>
+ List<T> getChildValues(Class<T> clazz, PropertyType type, String... keys)
+ {
+ List<ZNRecord> list = getChildValues(type, keys);
+ return HelixProperty.convertToTypedList(clazz, list);
+ }
+
+ @Override
+ public List<ZNRecord> getChildValues(PropertyType type, String... keys)
+ {
+ String path = PropertyPathConfig.getPath(type, _clusterName, keys);
+ List<ZNRecord> records = new ArrayList<ZNRecord>();
+ try
+ {
+ _readWriteLock.readLock().lock();
+
+ List<String> childs = _store.getPropertyNames(path);
+ if (childs == null || childs.size() == 0)
+ {
+ return Collections.emptyList();
+ }
+
+ for (String child : childs)
+ {
+ ZNRecord record = _store.getProperty(child);
+ if (record != null)
+ {
+ records.add(record);
+ }
+ }
+ }
+ catch(PropertyStoreException e)
+ {
+ LOG.error("Fail to get child properties cluster:" + _clusterName +
+ " parentPath:" + path + "\nexception: " + e);
+ }
+ finally
+ {
+ _readWriteLock.readLock().unlock();
+ }
+ return records;
+ }
+
+ // HACK remove it later
+ public PropertyStore<ZNRecord> getStore()
+ {
+ return _store;
+ }
+
+ private void updateIfExists(String path, final ZNRecord record, boolean mergeOnUpdate)
+ throws PropertyStoreException
+ {
+ if (_store.exists(path))
+ {
+ _store.setProperty(path, record);
+ }
+ }
+
+ private void createOrUpdate(String path, final ZNRecord record, final boolean mergeOnUpdate)
+ throws PropertyStoreException
+ {
+ final int RETRYLIMIT = 3;
+ int retryCount = 0;
+ while (retryCount < RETRYLIMIT)
+ {
+ try
+ {
+ if (_store.exists(path))
+ {
+ DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>()
+ {
+ @Override
+ public ZNRecord update(ZNRecord currentData)
+ {
+ if(mergeOnUpdate)
+ {
+ currentData.merge(record);
+ return currentData;
+ }
+ return record;
+ }
+ };
+ _store.updatePropertyUntilSucceed(path, updater);
+
+ }
+ else
+ {
+ if(record.getDeltaList().size() > 0)
+ {
+ ZNRecord newRecord = new ZNRecord(record.getId());
+ newRecord.merge(record);
+ _store.setProperty(path, newRecord);
+ }
+ else
+ {
+ _store.setProperty(path, record);
+ }
+ }
+ break;
+ }
+ catch (Exception e)
+ {
+ retryCount = retryCount + 1;
+ LOG.warn("Exception trying to update " + path + " Exception:"
+ + e.getMessage() + ". Will retry.");
+ }
+ }
+ }
+
+ @Override
+ public <T extends HelixProperty> Map<String, T> getChildValuesMap(Class<T> clazz,
+ PropertyType type, String... keys)
+ {
+ List<T> list = getChildValues(clazz, type, keys);
+ return HelixProperty.convertListToMap(list);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/file/FileHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/file/FileHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/file/FileHelixAdmin.java
new file mode 100644
index 0000000..58bff12
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/file/FileHelixAdmin.java
@@ -0,0 +1,486 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.file;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.ConfigScope;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ConfigScope.ConfigScopeProperty;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.helix.store.PropertyStoreException;
+import org.apache.helix.store.file.FilePropertyStore;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+
+
+@Deprecated
+public class FileHelixAdmin implements HelixAdmin
+{
+ private static Logger logger =
+ Logger.getLogger(FileHelixAdmin.class);
+ private final FilePropertyStore<ZNRecord> _store;
+
+ public FileHelixAdmin(FilePropertyStore<ZNRecord> store)
+ {
+ _store = store;
+ }
+
+ @Override
+ public List<String> getClusters()
+ {
+ throw new UnsupportedOperationException("getClusters() is NOT supported by FileClusterManagementTool");
+
+ }
+
+ @Override
+ public List<String> getInstancesInCluster(String clusterName)
+ {
+ // String path = HelixUtil.getConfigPath(clusterName);
+ String path =
+ PropertyPathConfig.getPath(PropertyType.CONFIGS,
+ clusterName,
+ ConfigScopeProperty.PARTICIPANT.toString());
+
+ List<String> childs = null;
+ List<String> instanceNames = new ArrayList<String>();
+ try
+ {
+ childs = _store.getPropertyNames(path);
+ for (String child : childs)
+ {
+ // strip config path from instanceName
+ String instanceName = child.substring(child.lastIndexOf('/') + 1);
+ instanceNames.add(instanceName);
+ }
+ return instanceNames;
+ }
+ catch (PropertyStoreException e)
+ {
+ logger.error("Fail to getInstancesInCluster, cluster " + clusterName, e);
+ }
+
+ return null;
+ }
+
+ @Override
+ public List<String> getResourcesInCluster(String clusterName)
+ {
+ // TODO Auto-generated method stub
+ // return null;
+ throw new UnsupportedOperationException("getResourcesInCluster() is NOT supported by FileClusterManagementTool");
+
+ }
+
+ @Override
+ public void addCluster(String clusterName, boolean overwritePrevRecord)
+ {
+ String path;
+ try
+ {
+ _store.removeNamespace(clusterName);
+ _store.createPropertyNamespace(clusterName);
+ _store.createPropertyNamespace(HelixUtil.getIdealStatePath(clusterName));
+
+ // CONFIG's
+ // _store.createPropertyNamespace(HelixUtil.getConfigPath(clusterName));
+ path =
+ PropertyPathConfig.getPath(PropertyType.CONFIGS,
+ clusterName,
+ ConfigScopeProperty.CLUSTER.toString(),
+ clusterName);
+ _store.setProperty(path, new ZNRecord(clusterName));
+ path =
+ PropertyPathConfig.getPath(PropertyType.CONFIGS,
+ clusterName,
+ ConfigScopeProperty.PARTICIPANT.toString());
+ _store.createPropertyNamespace(path);
+ path =
+ PropertyPathConfig.getPath(PropertyType.CONFIGS,
+ clusterName,
+ ConfigScopeProperty.RESOURCE.toString());
+ _store.createPropertyNamespace(path);
+
+ // PROPERTY STORE
+ path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, clusterName);
+ _store.createPropertyNamespace(path);
+
+ _store.createPropertyNamespace(HelixUtil.getLiveInstancesPath(clusterName));
+ _store.createPropertyNamespace(HelixUtil.getMemberInstancesPath(clusterName));
+ _store.createPropertyNamespace(HelixUtil.getExternalViewPath(clusterName));
+ _store.createPropertyNamespace(HelixUtil.getStateModelDefinitionPath(clusterName));
+
+ StateModelConfigGenerator generator = new StateModelConfigGenerator();
+ addStateModelDef(clusterName,
+ "MasterSlave",
+ new StateModelDefinition(generator.generateConfigForMasterSlave()));
+
+ // controller
+ _store.createPropertyNamespace(HelixUtil.getControllerPath(clusterName));
+ path = PropertyPathConfig.getPath(PropertyType.HISTORY, clusterName);
+ final ZNRecord emptyHistory = new ZNRecord(PropertyType.HISTORY.toString());
+ final List<String> emptyList = new ArrayList<String>();
+ emptyHistory.setListField(clusterName, emptyList);
+ _store.setProperty(path, emptyHistory);
+
+ path = PropertyPathConfig.getPath(PropertyType.MESSAGES_CONTROLLER, clusterName);
+ _store.createPropertyNamespace(path);
+
+ path =
+ PropertyPathConfig.getPath(PropertyType.STATUSUPDATES_CONTROLLER, clusterName);
+ _store.createPropertyNamespace(path);
+
+ path = PropertyPathConfig.getPath(PropertyType.ERRORS_CONTROLLER, clusterName);
+ _store.createPropertyNamespace(path);
+
+ }
+ catch (PropertyStoreException e)
+ {
+ logger.error("Fail to add cluster " + clusterName, e);
+ }
+
+ }
+
+ @Override
+ public void addResource(String clusterName,
+ String resource,
+ int numResources,
+ String stateModelRef)
+ {
+ String idealStatePath = HelixUtil.getIdealStatePath(clusterName);
+ String resourceIdealStatePath = idealStatePath + "/" + resource;
+
+ // if (_zkClient.exists(dbIdealStatePath))
+ // {
+ // logger.warn("Skip the operation. DB ideal state directory exists:"
+ // + dbIdealStatePath);
+ // return;
+ // }
+
+ IdealState idealState = new IdealState(resource);
+ idealState.setNumPartitions(numResources);
+ idealState.setStateModelDefRef(stateModelRef);
+ idealState.setReplicas(Integer.toString(0));
+ idealState.setIdealStateMode(IdealStateModeProperty.AUTO.toString());
+ try
+ {
+ _store.setProperty(resourceIdealStatePath, idealState.getRecord());
+ }
+ catch (PropertyStoreException e)
+ {
+ logger.error("Fail to add resource, cluster:" + clusterName + " resourceName:"
+ + resource, e);
+ }
+
+ }
+
+ @Override
+ public void addResource(String clusterName,
+ String resource,
+ int numResources,
+ String stateModelRef,
+ String idealStateMode)
+ {
+ throw new UnsupportedOperationException("ideal state mode not supported in file-based cluster manager");
+ }
+
+ @Override
+ public void addResource(String clusterName,
+ String resource,
+ int numResources,
+ String stateModelRef,
+ String idealStateMode,
+ int bucketSize)
+ {
+ throw new UnsupportedOperationException("bucketize not supported in file-based cluster manager");
+ }
+
+ @Override
+ public void addInstance(String clusterName, InstanceConfig config)
+ {
+ String configsPath =
+ PropertyPathConfig.getPath(PropertyType.CONFIGS,
+ clusterName,
+ ConfigScopeProperty.PARTICIPANT.toString());
+ String nodeId = config.getId();
+ String nodeConfigPath = configsPath + "/" + nodeId;
+
+ try
+ {
+ _store.setProperty(nodeConfigPath, config.getRecord());
+ _store.createPropertyNamespace(HelixUtil.getMessagePath(clusterName, nodeId));
+ _store.createPropertyNamespace(HelixUtil.getCurrentStateBasePath(clusterName,
+ nodeId));
+ _store.createPropertyNamespace(HelixUtil.getErrorsPath(clusterName, nodeId));
+ _store.createPropertyNamespace(HelixUtil.getStatusUpdatesPath(clusterName, nodeId));
+ }
+ catch (Exception e)
+ {
+ logger.error("Fail to add node, cluster:" + clusterName + "\nexception: " + e);
+ }
+
+ }
+
+ @Override
+ public void dropInstance(String clusterName, InstanceConfig config)
+ {
+ String configsPath =
+ PropertyPathConfig.getPath(PropertyType.CONFIGS,
+ clusterName,
+ ConfigScopeProperty.PARTICIPANT.toString());
+ String nodeId = config.getId();
+ String nodeConfigPath = configsPath + "/" + nodeId;
+
+ try
+ {
+ _store.setProperty(nodeConfigPath, config.getRecord());
+ }
+ catch (Exception e)
+ {
+ logger.error("Fail to drop node, cluster:" + clusterName, e);
+ }
+ }
+
+ @Override
+ public IdealState getResourceIdealState(String clusterName, String resourceName)
+ {
+ return new FileDataAccessor(_store, clusterName).getProperty(IdealState.class,
+ PropertyType.IDEALSTATES,
+ resourceName);
+ }
+
+ @Override
+ public void setResourceIdealState(String clusterName,
+ String resourceName,
+ IdealState idealState)
+ {
+ new FileDataAccessor(_store, clusterName).setProperty(PropertyType.IDEALSTATES,
+ idealState,
+ resourceName);
+ }
+
+ @Override
+ public void enableInstance(String clusterName, String instanceName, boolean enabled)
+ {
+ throw new UnsupportedOperationException("enableInstance() is NOT supported by FileClusterManagementTool");
+ }
+
+ @Override
+ public void enableCluster(String clusterName, boolean enabled)
+ {
+ throw new UnsupportedOperationException("enableCluster() is NOT supported by FileClusterManagementTool");
+ }
+
+ @Override
+ public void addStateModelDef(String clusterName,
+ String stateModelDef,
+ StateModelDefinition stateModel)
+ {
+
+ String stateModelDefPath = HelixUtil.getStateModelDefinitionPath(clusterName);
+ String stateModelPath = stateModelDefPath + "/" + stateModelDef;
+
+ try
+ {
+ _store.setProperty(stateModelPath, stateModel.getRecord());
+ }
+ catch (PropertyStoreException e)
+ {
+ logger.error("Fail to addStateModelDef, cluster:" + clusterName + " stateModelDef:"
+ + stateModelDef, e);
+ }
+
+ }
+
+ @Override
+ public void dropResource(String clusterName, String resourceName)
+ {
+ new FileDataAccessor(_store, clusterName).removeProperty(PropertyType.IDEALSTATES,
+ resourceName);
+
+ }
+
+ @Override
+ public List<String> getStateModelDefs(String clusterName)
+ {
+ throw new UnsupportedOperationException("getStateModelDefs() is NOT supported by FileClusterManagementTool");
+ }
+
+ @Override
+ public InstanceConfig getInstanceConfig(String clusterName, String instanceName)
+ {
+ throw new UnsupportedOperationException("getInstanceConfig() is NOT supported by FileClusterManagementTool");
+ }
+
+ @Override
+ public StateModelDefinition getStateModelDef(String clusterName, String stateModelName)
+ {
+ throw new UnsupportedOperationException("getStateModelDef() is NOT supported by FileClusterManagementTool");
+ }
+
+ @Override
+ public ExternalView getResourceExternalView(String clusterName, String resource)
+ {
+ throw new UnsupportedOperationException("getResourceExternalView() is NOT supported by FileClusterManagementTool");
+ }
+
+ @Override
+ public void enablePartition(boolean enabled,
+ String clusterName,
+ String instanceName,
+ String resourceName,
+ List<String> partitionNames)
+ {
+ throw new UnsupportedOperationException("enablePartition() is NOT supported by FileClusterManagementTool");
+ }
+
+ @Override
+ public void resetPartition(String clusterName,
+ String instanceName,
+ String resourceName,
+ List<String> partitionNames)
+ {
+ throw new UnsupportedOperationException("resetPartition() is NOT supported by FileClusterManagementTool");
+ }
+
+ @Override
+ public void resetInstance(String clusterName, List<String> instanceNames)
+ {
+ throw new UnsupportedOperationException("resetInstance() is NOT supported by FileClusterManagementTool");
+ }
+
+ @Override
+ public void resetResource(String clusterName, List<String> resourceNames)
+ {
+ throw new UnsupportedOperationException("resetResource() is NOT supported by FileClusterManagementTool");
+ }
+
+ @Override
+ public void addStat(String clusterName, String statName)
+ {
+ throw new UnsupportedOperationException("addStat() is NOT supported by FileClusterManagementTool");
+
+ }
+
+ @Override
+ public void addAlert(String clusterName, String alertName)
+ {
+ throw new UnsupportedOperationException("addAlert() is NOT supported by FileClusterManagementTool");
+
+ }
+
+ @Override
+ public void dropStat(String clusterName, String statName)
+ {
+ throw new UnsupportedOperationException("dropStat() is NOT supported by FileClusterManagementTool");
+
+ }
+
+ @Override
+ public void dropAlert(String clusterName, String alertName)
+ {
+ throw new UnsupportedOperationException("dropAlert() is NOT supported by FileClusterManagementTool");
+
+ }
+
+ @Override
+ public void dropCluster(String clusterName)
+ {
+ throw new UnsupportedOperationException("dropCluster() is NOT supported by FileClusterManagementTool");
+ }
+
+ @Override
+ public void addClusterToGrandCluster(String clusterName, String grandCluster)
+ {
+ throw new UnsupportedOperationException("addCluster(clusterName, overwritePrevRecord, grandCluster) is NOT supported by FileClusterManagementTool");
+ }
+
+ @Override
+ public void setConfig(ConfigScope scope, Map<String, String> properties)
+ {
+ // TODO Auto-generated method stub
+ throw new UnsupportedOperationException("unsupported operation");
+
+ }
+
+ @Override
+ public Map<String, String> getConfig(ConfigScope scope, Set<String> keys)
+ {
+ // TODO Auto-generated method stub
+ throw new UnsupportedOperationException("unsupported operation");
+ }
+
+ @Override
+ public List<String> getConfigKeys(ConfigScopeProperty scope,
+ String clusterName,
+ String... keys)
+ {
+ // TODO Auto-generated method stub
+ throw new UnsupportedOperationException("unsupported operation");
+ }
+
+ @Override
+ public void removeConfig(ConfigScope scope, Set<String> keys)
+ {
+ // TODO Auto-generated method stub
+ throw new UnsupportedOperationException("unsupported operation");
+
+ }
+
+ @Override
+ public void rebalance(String clusterName, String resourceName, int replica)
+ {
+ // TODO Auto-generated method stub
+ throw new UnsupportedOperationException("unsupported operation");
+ }
+
+ @Override
+ public void addIdealState(String clusterName, String resourceName, String idealStateFile) throws IOException
+ {
+ // TODO Auto-generated method stub
+ throw new UnsupportedOperationException("unsupported operation");
+ }
+
+ @Override
+ public void addStateModelDef(String clusterName,
+ String stateModelDefName,
+ String stateModelDefFile) throws IOException
+ {
+ // TODO Auto-generated method stub
+ throw new UnsupportedOperationException("unsupported operation");
+ }
+
+ @Override
+ public void addMessageConstraint(String clusterName,
+ String constraintId,
+ Map<String, String> constraints)
+ {
+ // TODO Auto-generated method stub
+ throw new UnsupportedOperationException("unsupported operation");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/file/FileHelixDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/file/FileHelixDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/file/FileHelixDataAccessor.java
new file mode 100644
index 0000000..9b3db9b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/file/FileHelixDataAccessor.java
@@ -0,0 +1,351 @@
+package org.apache.helix.manager.file;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.store.PropertyStore;
+import org.apache.helix.store.PropertyStoreException;
+import org.apache.helix.store.file.FilePropertyStore;
+import org.apache.log4j.Logger;
+
+
+@Deprecated
+public class FileHelixDataAccessor implements HelixDataAccessor
+{
+ private static Logger LOG = Logger.getLogger(FileHelixDataAccessor.class);
+
+ private final FilePropertyStore<ZNRecord> _store;
+ private final String _clusterName;
+ private final ReadWriteLock _readWriteLock = new ReentrantReadWriteLock();
+ private final Builder _propertyKeyBuilder;
+
+
+ public FileHelixDataAccessor(FilePropertyStore<ZNRecord> store,
+ String clusterName)
+ {
+ _store = store;
+ _clusterName = clusterName;
+ _propertyKeyBuilder = new PropertyKey.Builder(_clusterName);
+ }
+
+ @Override
+ public boolean createProperty(PropertyKey key, HelixProperty value)
+ {
+ return updateProperty(key, value);
+ }
+
+ @Override
+ public <T extends HelixProperty> boolean setProperty(PropertyKey key, T value)
+ {
+ String path = key.getPath();
+ try
+ {
+ _readWriteLock.writeLock().lock();
+ _store.setProperty(path, value.getRecord());
+ return true;
+ }
+ catch(PropertyStoreException e)
+ {
+ LOG.error("Fail to set cluster property clusterName: " + _clusterName +
+ " type:" + key.getType() +
+ " keys:" + Arrays.toString(key.getParams()), e);
+ return false;
+ }
+ finally
+ {
+ _readWriteLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public <T extends HelixProperty> boolean updateProperty(PropertyKey key,
+ T value)
+ {
+ PropertyType type = key.getType();
+ String path = key.getPath();
+
+ try
+ {
+ _readWriteLock.writeLock().lock();
+
+ if (type.isUpdateOnlyOnExists())
+ {
+ updateIfExists(path, value.getRecord(), type.isMergeOnUpdate());
+ }
+ else
+ {
+ createOrUpdate(path, value.getRecord(), type.isMergeOnUpdate());
+ }
+ return true;
+ }
+ catch (PropertyStoreException e)
+ {
+ LOG.error("fail to update property. type:" +
+ type + ", keys:" + Arrays.toString(key.getParams()), e);
+ return false;
+ }
+ finally
+ {
+ _readWriteLock.writeLock().unlock();
+ }
+
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T extends HelixProperty> T getProperty(PropertyKey key)
+ {
+ String path = key.getPath();
+ try
+ {
+ _readWriteLock.readLock().lock();
+ ZNRecord record = _store.getProperty(path);
+ if (record == null)
+ {
+ return null;
+ }
+ return (T) HelixProperty.convertToTypedInstance(key.getTypeClass(), record);
+ }
+ catch(PropertyStoreException e)
+ {
+ LOG.error("Fail to get property. clusterName: " + _clusterName +
+ " type:" + key.getType() +
+ " keys:" + Arrays.toString(key.getParams()), e);
+ return null;
+ }
+ finally
+ {
+ _readWriteLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public boolean removeProperty(PropertyKey key)
+ {
+ String path = key.getPath();;
+
+ try
+ {
+ _readWriteLock.writeLock().lock();
+ _store.removeProperty(path);
+ return true;
+ }
+ catch (PropertyStoreException e)
+ {
+ LOG.error("Fail to remove property. type:" +
+ key.getType() + ", keys:" + Arrays.toString(key.getParams()), e);
+ return false;
+ }
+ finally
+ {
+ _readWriteLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public List<String> getChildNames(PropertyKey key)
+ {
+ String path = key.getPath();;
+
+ try
+ {
+ _readWriteLock.readLock().lock();
+
+ List<String> childs = _store.getPropertyNames(path);
+ return childs;
+ }
+ catch(PropertyStoreException e)
+ {
+ LOG.error("Fail to get child names. clusterName: " + _clusterName +
+ ", parentPath:" + path, e);
+
+ return Collections.emptyList();
+ }
+ finally
+ {
+ _readWriteLock.readLock().unlock();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T extends HelixProperty> List<T> getChildValues(PropertyKey key)
+ {
+ String path = key.getPath();
+ List<T> records = new ArrayList<T>();
+ try
+ {
+ _readWriteLock.readLock().lock();
+
+ List<String> childs = _store.getPropertyNames(path);
+ if (childs == null || childs.size() == 0)
+ {
+ return Collections.emptyList();
+ }
+
+ for (String child : childs)
+ {
+ ZNRecord record = _store.getProperty(child);
+ if (record != null)
+ {
+ records.add((T) HelixProperty.convertToTypedInstance(key.getTypeClass(), record));
+ }
+ }
+ }
+ catch(PropertyStoreException e)
+ {
+ LOG.error("Fail to get child properties. clusterName:" + _clusterName +
+ ", parentPath:" + path, e);
+ }
+ finally
+ {
+ _readWriteLock.readLock().unlock();
+ }
+
+ return records;
+ }
+
+ @Override
+ public <T extends HelixProperty> Map<String, T> getChildValuesMap(
+ PropertyKey key)
+ {
+ List<T> list = getChildValues(key);
+ return HelixProperty.convertListToMap(list);
+ }
+
+ @Override
+ public Builder keyBuilder()
+ {
+ return _propertyKeyBuilder;
+ }
+
+ @Override
+ public <T extends HelixProperty> boolean[] createChildren(
+ List<PropertyKey> keys, List<T> children)
+ {
+ boolean[] success = new boolean[keys.size()];
+ for (int i = 0; i < keys.size(); i++)
+ {
+ success[i] = createProperty(keys.get(i), children.get(i));
+ }
+ return success;
+ }
+
+ @Override
+ public <T extends HelixProperty> boolean[] setChildren(
+ List<PropertyKey> keys, List<T> children)
+ {
+ boolean[] success = new boolean[keys.size()];
+ for (int i = 0; i < keys.size(); i++)
+ {
+ success[i] = setProperty(keys.get(i), children.get(i));
+ }
+ return success;
+ }
+
+ @Override
+ public BaseDataAccessor getBaseDataAccessor()
+ {
+ throw new UnsupportedOperationException("No BaseDataAccessor for FileHelixDataAccessor");
+ }
+
+ // HACK remove it later
+ public PropertyStore<ZNRecord> getStore()
+ {
+ return _store;
+ }
+
+ private void createOrUpdate(String path, final ZNRecord record, final boolean mergeOnUpdate)
+ throws PropertyStoreException
+ {
+ final int RETRYLIMIT = 3;
+ int retryCount = 0;
+ while (retryCount < RETRYLIMIT)
+ {
+ try
+ {
+ if (_store.exists(path))
+ {
+ DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>()
+ {
+ @Override
+ public ZNRecord update(ZNRecord currentData)
+ {
+ if(mergeOnUpdate)
+ {
+ currentData.merge(record);
+ return currentData;
+ }
+ return record;
+ }
+ };
+ _store.updatePropertyUntilSucceed(path, updater);
+
+ }
+ else
+ {
+ if(record.getDeltaList().size() > 0)
+ {
+ ZNRecord newRecord = new ZNRecord(record.getId());
+ newRecord.merge(record);
+ _store.setProperty(path, newRecord);
+ }
+ else
+ {
+ _store.setProperty(path, record);
+ }
+ }
+ break;
+ }
+ catch (Exception e)
+ {
+ retryCount = retryCount + 1;
+ LOG.warn("Exception trying to update " + path + " Exception:"
+ + e.getMessage() + ". Will retry.");
+ }
+ }
+ }
+
+ private void updateIfExists(String path, final ZNRecord record, boolean mergeOnUpdate)
+ throws PropertyStoreException
+ {
+ if (_store.exists(path))
+ {
+ _store.setProperty(path, record);
+ }
+ }
+
+ @Override
+ public <T extends HelixProperty> boolean[] updateChildren(List<String> paths,
+ List<DataUpdater<ZNRecord>> updaters, int options)
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T extends HelixProperty> List<T> getProperty(List<PropertyKey> keys)
+ {
+ List<T> list = new ArrayList<T>();
+ for (PropertyKey key : keys)
+ {
+ list.add((T)getProperty(key));
+ }
+ return list;
+ }
+
+}