You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 00:26:40 UTC
[13/47] Refactoring from com.linkedin.helix to org.apache.helix
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/healthcheck/StatHealthReportProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/healthcheck/StatHealthReportProvider.java b/helix-core/src/main/java/com/linkedin/helix/healthcheck/StatHealthReportProvider.java
deleted file mode 100644
index 720eea5..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/healthcheck/StatHealthReportProvider.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.healthcheck;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.log4j.Logger;
-
-public class StatHealthReportProvider extends HealthReportProvider
-{
-
- private static final Logger _logger = Logger
- .getLogger(StatHealthReportProvider.class);
-
- /*
- * public final static String _testStat = "testStat"; public final static
- * String _readLatencyStat = "readLatencyStat"; public final static String
- * _requestCountStat = "requestCountStat"; public final static String
- * _partitionRequestCountStat = "partitionRequestCountStat";
- */
-
- public static final String REPORT_NAME = "ParticipantStats";
- public String _reportName = REPORT_NAME;
-
- public static final String STAT_VALUE = "value";
- public static final String TIMESTAMP = "timestamp";
-
- public int readLatencyCount = 0;
- public double readLatencySum = 0;
-
- public int requestCount = 0;
-
- // private final Map<String, String> _partitionCountsMap = new HashMap<String,
- // String>();
-
- // private final Map<String, HashMap<String,String>> _partitionStatMaps = new
- // HashMap<String, HashMap<String,String>>();
- private final ConcurrentHashMap<String, String> _statsToValues = new ConcurrentHashMap<String, String>();
- private final ConcurrentHashMap<String, String> _statsToTimestamps = new ConcurrentHashMap<String, String>();
-
- public StatHealthReportProvider()
- {
- }
-
- @Override
- public Map<String, String> getRecentHealthReport()
- {
- return null;
- }
-
- // TODO: function is misnamed, but return type is what I want
- @Override
- public Map<String, Map<String, String>> getRecentPartitionHealthReport()
- {
- Map<String, Map<String, String>> result = new HashMap<String, Map<String, String>>();
- for (String stat : _statsToValues.keySet())
- {
- Map<String, String> currStat = new HashMap<String, String>();
- /*
- * currStat.put(Stat.OP_TYPE, stat._opType);
- * currStat.put(Stat.MEASUREMENT_TYPE, stat._measurementType);
- * currStat.put(Stat.NODE_NAME, stat._nodeName);
- * currStat.put(Stat.PARTITION_NAME, stat._partitionName);
- * currStat.put(Stat.RESOURCE_NAME, stat._resourceName);
- * currStat.put(Stat.RETURN_STATUS, stat._returnStatus);
- * currStat.put(Stat.METRIC_NAME, stat._metricName);
- * currStat.put(Stat.AGG_TYPE, stat._aggTypeName);
- */
- currStat.put(TIMESTAMP, _statsToTimestamps.get(stat));
- currStat.put(STAT_VALUE, _statsToValues.get(stat));
- result.put(stat, currStat);
- }
- return result;
- }
-
- public boolean contains(Stat inStat)
- {
- return _statsToValues.containsKey(inStat);
- }
-
- public Set<String> keySet()
- {
- return _statsToValues.keySet();
- }
-
- public String getStatValue(Stat inStat)
- {
- return _statsToValues.get(inStat);
- }
-
- public long getStatTimestamp(Stat inStat)
- {
- return Long.parseLong(_statsToTimestamps.get(inStat));
- }
-
- /*
- * public String getStatValue(String opType, String measurementType, String
- * resourceName, String partitionName, String nodeName, boolean
- * createIfMissing) { Stat rs = new Stat(opType, measurementType,
- * resourceName, partitionName, nodeName); String val =
- * _statsToValues.get(rs); if (val == null && createIfMissing) { val = "0";
- * _statsToValues.put(rs, val); } return val; }
- */
-
- public void writeStat(String statName, String val, String timestamp)
- {
- _statsToValues.put(statName, val);
- _statsToTimestamps.put(statName, timestamp);
- }
-
- /*
- * public void setStat(Stat es, String val, String timestamp) { writeStat(es,
- * val, timestamp); }
- *
- * public void setStat(String opType, String measurementType, String
- * resourceName, String partitionName, String nodeName, double val, String
- * timestamp) { Stat rs = new Stat(opType, measurementType, resourceName,
- * partitionName, nodeName); writeStat(rs, String.valueOf(val), timestamp); }
- */
-
- public void incrementStat(String statName, String timestamp)
- {
- // Stat rs = new Stat(opType, measurementType, resourceName, partitionName,
- // nodeName);
- String val = _statsToValues.get(statName);
- if (val == null)
- {
- val = "0";
- }
- else
- {
- val = String.valueOf(Double.parseDouble(val) + 1);
- }
- writeStat(statName, val, timestamp);
- }
-
- public int size()
- {
- return _statsToValues.size();
- }
-
- public void resetStats()
- {
- _statsToValues.clear();
- _statsToTimestamps.clear();
- }
-
- public void setReportName(String name)
- {
- _reportName = name;
- }
-
- public String getReportName()
- {
- return _reportName;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/healthcheck/WindowAggregationType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/healthcheck/WindowAggregationType.java b/helix-core/src/main/java/com/linkedin/helix/healthcheck/WindowAggregationType.java
deleted file mode 100644
index 0bb488a..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/healthcheck/WindowAggregationType.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.healthcheck;
-
-import java.util.TimerTask;
-
-import org.apache.log4j.Logger;
-
-public class WindowAggregationType implements AggregationType
-{
-
- private static final Logger logger = Logger
- .getLogger(WindowAggregationType.class);
-
- public final String WINDOW_DELIM = "#";
-
- public final static String TYPE_NAME = "window";
-
- int _windowSize = 1;
-
- public WindowAggregationType(int ws)
- {
- super();
- _windowSize = ws;
- }
-
- @Override
- public String getName()
- {
- StringBuilder sb = new StringBuilder();
- sb.append(TYPE_NAME);
- sb.append(DELIM);
- sb.append(_windowSize);
- return sb.toString();
- }
-
- @Override
- public String merge(String incomingVal, String existingVal, long prevTimestamp)
- {
- String[] windowVals;
- if (existingVal == null)
- {
- return incomingVal;
- }
- else
- {
- windowVals = existingVal.split(WINDOW_DELIM);
- int currLength = windowVals.length;
- // window not full
- if (currLength < _windowSize)
- {
- return existingVal + WINDOW_DELIM + incomingVal;
- }
- // evict oldest
- else
- {
- int firstDelim = existingVal.indexOf(WINDOW_DELIM);
- return existingVal.substring(firstDelim + 1) + WINDOW_DELIM
- + incomingVal;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/healthcheck/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/healthcheck/package-info.java b/helix-core/src/main/java/com/linkedin/helix/healthcheck/package-info.java
deleted file mode 100644
index dd64f06..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/healthcheck/package-info.java
+++ /dev/null
@@ -1,5 +0,0 @@
-/**
- * Helix health check classes
- *
- */
-package com.linkedin.helix.healthcheck;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/josql/ClusterJosqlQueryProcessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/josql/ClusterJosqlQueryProcessor.java b/helix-core/src/main/java/com/linkedin/helix/josql/ClusterJosqlQueryProcessor.java
deleted file mode 100644
index d96598a..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/josql/ClusterJosqlQueryProcessor.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.josql;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-import org.josql.Query;
-import org.josql.QueryExecutionException;
-import org.josql.QueryParseException;
-import org.josql.QueryResults;
-
-import com.linkedin.helix.ConfigScope.ConfigScopeProperty;
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.HelixProperty;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.model.LiveInstance.LiveInstanceProperty;
-
-public class ClusterJosqlQueryProcessor
-{
- public static final String PARTITIONS = "PARTITIONS";
- public static final String FLATTABLE = ".Table";
-
- HelixManager _manager;
- private static Logger _logger = Logger.getLogger(ClusterJosqlQueryProcessor.class);
-
- public ClusterJosqlQueryProcessor(HelixManager manager)
- {
- _manager = manager;
- }
-
- String parseFromTarget(String sql)
- {
- // We need to find out the "FROM" target, and replace it with liveInstances
- // / partitions etc
- int fromIndex = sql.indexOf("FROM");
- if (fromIndex == -1)
- {
- throw new HelixException("Query must contain FROM target. Query: " + sql);
- }
- // Per JoSql, select FROM <target> the target must be a object class that
- // corresponds to a "table row"
- // In out case, the row is always a ZNRecord
-
- int nextSpace = sql.indexOf(" ", fromIndex);
- while (sql.charAt(nextSpace) == ' ')
- {
- nextSpace++;
- }
- int nextnextSpace = sql.indexOf(" ", nextSpace);
- if (nextnextSpace == -1)
- {
- nextnextSpace = sql.length();
- }
- String fromTarget = sql.substring(nextSpace, nextnextSpace).trim();
-
- if (fromTarget.length() == 0)
- {
- throw new HelixException("FROM target in the query cannot be empty. Query: " + sql);
- }
- return fromTarget;
- }
-
- public List<Object> runJoSqlQuery(String josql, Map<String, Object> bindVariables,
- List<Object> additionalFunctionHandlers, List queryTarget) throws QueryParseException,
- QueryExecutionException
- {
- Query josqlQuery = prepareQuery(bindVariables, additionalFunctionHandlers);
-
- josqlQuery.parse(josql);
- QueryResults qr = josqlQuery.execute(queryTarget);
-
- return qr.getResults();
- }
-
- Query prepareQuery(Map<String, Object> bindVariables, List<Object> additionalFunctionHandlers)
- {
- // DataAccessor accessor = _manager.getDataAccessor();
- HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-
- // Get all the ZNRecords in the cluster and set them as bind variables
- Builder keyBuilder = accessor.keyBuilder();
-// List<ZNRecord> instanceConfigs = accessor.getChildValues(PropertyType.CONFIGS,
-// ConfigScopeProperty.PARTICIPANT.toString());
-
- List<ZNRecord> instanceConfigs = HelixProperty.convertToList(accessor.getChildValues(keyBuilder.instanceConfigs()));
-
- List<ZNRecord> liveInstances = HelixProperty.convertToList(accessor.getChildValues(keyBuilder.liveInstances()));
- List<ZNRecord> stateModelDefs = HelixProperty.convertToList(accessor.getChildValues(keyBuilder.stateModelDefs()));
-
- // Idealstates are stored in a map from resource name to idealState ZNRecord
- List<ZNRecord> idealStateList = HelixProperty.convertToList(accessor.getChildValues(keyBuilder.idealStates()));
-
- Map<String, ZNRecord> idealStatesMap = new HashMap<String, ZNRecord>();
- for (ZNRecord idealState : idealStateList)
- {
- idealStatesMap.put(idealState.getId(), idealState);
- }
- // Make up the partition list: for selecting partitions
- List<ZNRecord> partitions = new ArrayList<ZNRecord>();
- for (ZNRecord idealState : idealStateList)
- {
- for (String partitionName : idealState.getMapFields().keySet())
- {
- partitions.add(new ZNRecord(partitionName));
- }
- }
-
- List<ZNRecord> externalViewList = HelixProperty.convertToList(accessor.getChildValues(keyBuilder.externalViews()));
- // ExternalViews are stored in a map from resource name to idealState
- // ZNRecord
- Map<String, ZNRecord> externalViewMap = new HashMap<String, ZNRecord>();
- for (ZNRecord externalView : externalViewList)
- {
- externalViewMap.put(externalView.getId(), externalView);
- }
- // Map from instance name to a map from resource to current state ZNRecord
- Map<String, Map<String, ZNRecord>> currentStatesMap = new HashMap<String, Map<String, ZNRecord>>();
- // Map from instance name to a list of combined flat ZNRecordRow
- Map<String, List<ZNRecordRow>> flatCurrentStateMap = new HashMap<String, List<ZNRecordRow>>();
-
- for (ZNRecord instance : liveInstances)
- {
- String host = instance.getId();
- String sessionId = instance.getSimpleField(LiveInstanceProperty.SESSION_ID.toString());
- Map<String, ZNRecord> currentStates = new HashMap<String, ZNRecord>();
- List<ZNRecord> instanceCurrentStateList = new ArrayList<ZNRecord>();
- for (ZNRecord idealState : idealStateList)
- {
- String resourceName = idealState.getId();
-
- HelixProperty property = accessor.getProperty(keyBuilder.currentState(host, sessionId, resourceName));
- ZNRecord currentState =null;
- if (property == null)
- {
- _logger.warn("Resource " + resourceName + " has null currentState");
- currentState = new ZNRecord(resourceName);
- }else{
- currentState = property.getRecord();
- }
- currentStates.put(resourceName, currentState);
- instanceCurrentStateList.add(currentState);
- }
- currentStatesMap.put(host, currentStates);
- flatCurrentStateMap.put(host, ZNRecordRow.flatten(instanceCurrentStateList));
- }
- Query josqlQuery = new Query();
-
- // Set the default bind variables
- josqlQuery
-.setVariable(
- PropertyType.CONFIGS.toString() + "/" + ConfigScopeProperty.PARTICIPANT.toString(),
- instanceConfigs);
- josqlQuery.setVariable(PropertyType.IDEALSTATES.toString(), idealStatesMap);
- josqlQuery.setVariable(PropertyType.LIVEINSTANCES.toString(), liveInstances);
- josqlQuery.setVariable(PropertyType.STATEMODELDEFS.toString(), stateModelDefs);
- josqlQuery.setVariable(PropertyType.EXTERNALVIEW.toString(), externalViewMap);
- josqlQuery.setVariable(PropertyType.CURRENTSTATES.toString(), currentStatesMap);
- josqlQuery.setVariable(PARTITIONS, partitions);
-
- // Flat version of ZNRecords
- josqlQuery.setVariable(
- PropertyType.CONFIGS.toString() + "/" + ConfigScopeProperty.PARTICIPANT.toString()
- + FLATTABLE,
- ZNRecordRow.flatten(instanceConfigs));
- josqlQuery.setVariable(PropertyType.IDEALSTATES.toString() + FLATTABLE,
- ZNRecordRow.flatten(idealStateList));
- josqlQuery.setVariable(PropertyType.LIVEINSTANCES.toString() + FLATTABLE,
- ZNRecordRow.flatten(liveInstances));
- josqlQuery.setVariable(PropertyType.STATEMODELDEFS.toString() + FLATTABLE,
- ZNRecordRow.flatten(stateModelDefs));
- josqlQuery.setVariable(PropertyType.EXTERNALVIEW.toString() + FLATTABLE,
- ZNRecordRow.flatten(externalViewList));
- josqlQuery.setVariable(PropertyType.CURRENTSTATES.toString() + FLATTABLE,
- flatCurrentStateMap.values());
- josqlQuery.setVariable(PARTITIONS + FLATTABLE, ZNRecordRow.flatten(partitions));
- // Set additional bind variables
- if (bindVariables != null)
- {
- for (String key : bindVariables.keySet())
- {
- josqlQuery.setVariable(key, bindVariables.get(key));
- }
- }
-
- josqlQuery.addFunctionHandler(new ZNRecordJosqlFunctionHandler());
- josqlQuery.addFunctionHandler(new ZNRecordRow());
- josqlQuery.addFunctionHandler(new Integer(0));
- if (additionalFunctionHandlers != null)
- {
- for (Object functionHandler : additionalFunctionHandlers)
- {
- josqlQuery.addFunctionHandler(functionHandler);
- }
- }
- return josqlQuery;
- }
-
- public List<Object> runJoSqlQuery(String josql, Map<String, Object> bindVariables,
- List<Object> additionalFunctionHandlers) throws QueryParseException, QueryExecutionException
- {
- Query josqlQuery = prepareQuery(bindVariables, additionalFunctionHandlers);
-
- // Per JoSql, select FROM <target> the target must be a object class that
- // corresponds to a "table row",
- // while the table (list of Objects) are put in the query by
- // query.execute(List<Object>). In the input,
- // In out case, the row is always a ZNRecord. But in SQL, the from target is
- // a "table name".
-
- String fromTargetString = parseFromTarget(josql);
-
- List fromTargetList = null;
- Object fromTarget = null;
- if (fromTargetString.equalsIgnoreCase(PARTITIONS))
- {
- fromTarget = josqlQuery.getVariable(PARTITIONS.toString());
- } else if (fromTargetString.equalsIgnoreCase(PropertyType.LIVEINSTANCES.toString()))
- {
- fromTarget = josqlQuery.getVariable(PropertyType.LIVEINSTANCES.toString());
- } else if (fromTargetString.equalsIgnoreCase(PropertyType.CONFIGS.toString() + "/"
- + ConfigScopeProperty.PARTICIPANT.toString()))
- {
- fromTarget = josqlQuery.getVariable(PropertyType.CONFIGS.toString() + "/"
- + ConfigScopeProperty.PARTICIPANT.toString());
- } else if (fromTargetString.equalsIgnoreCase(PropertyType.STATEMODELDEFS.toString()))
- {
- fromTarget = josqlQuery.getVariable(PropertyType.STATEMODELDEFS.toString());
- } else if (fromTargetString.equalsIgnoreCase(PropertyType.EXTERNALVIEW.toString()))
- {
- fromTarget = josqlQuery.getVariable(PropertyType.EXTERNALVIEW.toString());
- }
- else if (fromTargetString.equalsIgnoreCase(PropertyType.IDEALSTATES.toString()))
- {
- fromTarget = josqlQuery.getVariable(PropertyType.IDEALSTATES.toString());
- }
-
- else if (fromTargetString.equalsIgnoreCase(PARTITIONS + FLATTABLE))
- {
- fromTarget = josqlQuery.getVariable(PARTITIONS.toString() + FLATTABLE);
- } else if (fromTargetString.equalsIgnoreCase(PropertyType.LIVEINSTANCES.toString() + FLATTABLE))
- {
- fromTarget = josqlQuery.getVariable(PropertyType.LIVEINSTANCES.toString() + FLATTABLE);
- } else if (fromTargetString.equalsIgnoreCase(PropertyType.CONFIGS.toString() + "/"
- + ConfigScopeProperty.PARTICIPANT.toString()
- + FLATTABLE))
- {
- fromTarget = josqlQuery.getVariable(PropertyType.CONFIGS.toString() + "/"
- + ConfigScopeProperty.PARTICIPANT.toString() + FLATTABLE);
- } else if (fromTargetString
- .equalsIgnoreCase(PropertyType.STATEMODELDEFS.toString() + FLATTABLE))
- {
- fromTarget = josqlQuery.getVariable(PropertyType.STATEMODELDEFS.toString() + FLATTABLE);
- } else if (fromTargetString.equalsIgnoreCase(PropertyType.EXTERNALVIEW.toString() + FLATTABLE))
- {
- fromTarget = josqlQuery.getVariable(PropertyType.EXTERNALVIEW.toString() + FLATTABLE);
- }
- else if (fromTargetString.equalsIgnoreCase(PropertyType.IDEALSTATES.toString() + FLATTABLE))
- {
- fromTarget = josqlQuery.getVariable(PropertyType.IDEALSTATES.toString() + FLATTABLE);
- }
- else
- {
- throw new HelixException(
- "Unknown query target "
- + fromTargetString
- + ". Target should be PARTITIONS, LIVEINSTANCES, CONFIGS, STATEMODELDEFS, IDEALSTATES, EXTERNALVIEW, and corresponding flat Tables");
- }
-
- fromTargetList = fromTargetString.endsWith(FLATTABLE) ? ((List<ZNRecordRow>) fromTarget)
- : ((List<ZNRecord>) fromTarget);
-
- // Per JoSql, select FROM <target> the target must be a object class that
- // corresponds to a "table row"
- // In out case, the row is always a ZNRecord
- josql = josql.replaceFirst(
- fromTargetString,
- fromTargetString.endsWith(FLATTABLE) ? ZNRecordRow.class.getName() : ZNRecord.class
- .getName());
- josqlQuery.parse(josql);
- QueryResults qr = josqlQuery.execute(fromTargetList);
- return qr.getResults();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/josql/DataAccessorBasedTupleReader.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/josql/DataAccessorBasedTupleReader.java b/helix-core/src/main/java/com/linkedin/helix/josql/DataAccessorBasedTupleReader.java
deleted file mode 100644
index 855b7f7..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/josql/DataAccessorBasedTupleReader.java
+++ /dev/null
@@ -1,171 +0,0 @@
-package com.linkedin.helix.josql;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.PropertyPathConfig;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-
-public class DataAccessorBasedTupleReader implements ZNRecordQueryProcessor.ZNRecordTupleReader
-{
- Map<String , List<ZNRecord>> _cache = new HashMap<String, List<ZNRecord>>();
- private final HelixDataAccessor _dataAccessor;
- private final String _clusterName;
-
- public DataAccessorBasedTupleReader(HelixDataAccessor dataAccessor, String clusterName)
- {
- _dataAccessor = dataAccessor;
- _clusterName = clusterName;
- }
-
- @Override
- public List<ZNRecord> get(String path) throws Exception
- {
- //first check cache
- if(_cache.containsKey(path))
- {
- return _cache.get(path);
- }
-
- String[] tmp = path.split("/");
- if(tmp.length == 0 || tmp[0].equals("*"))
- {
- throw new Exception("Unable to read " + path);
- }
-
- PropertyType parentProperty = null;
- try
- {
- parentProperty = PropertyType.valueOf(tmp[0]);
- }
- catch(Exception e)
- {
- throw new Exception("Unable to translate " + tmp[0] + " into a valid property type", e);
- }
-
- boolean pathHasWildcard = path.contains("*");
- String childKeys[] = new String[tmp.length-1];
- for(int i = 1; i < tmp.length; i++)
- {
- childKeys[i-1] = tmp[i];
- }
-
- List<ZNRecord> ret = null;
- String parentPath = PropertyPathConfig.getPath(parentProperty, _clusterName);
-
- if(pathHasWildcard)
- {
- List<String> paths = expandWildCards(parentProperty, childKeys);
- ret = new ArrayList<ZNRecord>();
- for(String expandedPath : paths)
- {
- String fullPath = parentPath + "/" + expandedPath;
- try
- {
- ZNRecord record = _dataAccessor.getBaseDataAccessor().get(fullPath, null, 0);
- if(record != null)
- {
- ret.add(record);
- }
- } catch (ZkNoNodeException e)
- {
- // OK.
- }
- }
- }
- else
- {
- String propertyPath = PropertyPathConfig.getPath(parentProperty, _clusterName, childKeys);
- ret = _dataAccessor.getBaseDataAccessor().getChildren(propertyPath, null, 0);
- //ret = _dataAccessor.getChildValues(parentProperty, childKeys);
- if(ret.size() == 0) //could be a leaf node - try accessing property directly
- {
- String fullPath = parentPath;
- for(String childKey : childKeys)
- {
- fullPath += "/" + childKey;
- }
- ZNRecord record = _dataAccessor.getBaseDataAccessor().get(fullPath, null, 0);
- if(record != null)
- {
- ret = Arrays.asList(record);
- }
- }
- }
-
- _cache.put(path, ret);
- return ret;
- }
-
- private List<String> expandWildCards(PropertyType parentProperty, String[] pathElements)
- {
- if(pathElements.length == 0)
- {
- return Collections.emptyList();
- }
-
- String path = "";
- for (int i = 0; i < pathElements.length; i++)
- {
- path += pathElements[i];
- if(i != pathElements.length -1)
- {
- path += "/";
- }
- }
- if(!path.contains("*"))
- {
- return Arrays.asList(path);
- }
- else
- {
- List<String> prefix = new ArrayList<String>();
-
- for (int i = 0; i < pathElements.length; i++)
- {
- if(!pathElements[i].equals("*"))
- {
- prefix.add(pathElements[i]);
- }
- else
- {
- List<String> ret = new ArrayList<String>();
- Set<String> childNames = new HashSet<String>();
- //List<String> childNamesList = _dataAccessor.getChildNames(parentProperty, prefix.toArray(new String[0]));
- String parentPath = PropertyPathConfig.getPath(parentProperty, _clusterName,prefix.toArray(new String[0]));
- List<String> childNamesList = _dataAccessor.getBaseDataAccessor().getChildNames(parentPath, 0);
- childNames.addAll(childNamesList);
- for(String child : childNames)
- {
- pathElements[i] = child;
- ret.addAll(expandWildCards(parentProperty, pathElements));
- }
- return ret;
- }
- }
- }
-
- return Collections.emptyList();
- }
-
- public void setTempTable(String path, List<ZNRecord> table)
- {
- _cache.put(path, table);
- }
-
- @Override
- public void reset()
- {
- _cache.clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/josql/JsqlQueryListProcessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/josql/JsqlQueryListProcessor.java b/helix-core/src/main/java/com/linkedin/helix/josql/JsqlQueryListProcessor.java
deleted file mode 100644
index 766ef64..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/josql/JsqlQueryListProcessor.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package com.linkedin.helix.josql;
-
-import java.util.List;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.ZNRecord;
-
-/**
- * Execute a list of combined queries. A combined query has the form {query}-->{resultName},
- * while the next query will have the previous result via the previous resultName.
- *
- * */
-public class JsqlQueryListProcessor
-{
- public static final String SEPARATOR = "-->";
- private static Logger _logger = Logger.getLogger(JsqlQueryListProcessor.class);
-
- public static List<ZNRecord> executeQueryList(HelixDataAccessor accessor, String clusterName, List<String> combinedQueryList) throws Exception
- {
- ZNRecordQueryProcessor processor = new ZNRecordQueryProcessor();
- DataAccessorBasedTupleReader tupleReader = new DataAccessorBasedTupleReader(accessor, clusterName);
- List<ZNRecord> tempResult = null;
- for(int i = 0; i < combinedQueryList.size(); i++)
- {
- String combinedQuery = combinedQueryList.get(i);
- String query = combinedQuery;
- String resultName = "";
- int pos = combinedQuery.indexOf(SEPARATOR);
- if(pos < 0)
- {
- if(i <combinedQueryList.size() - 1)
- {
- _logger.error("Combined query " + combinedQuery + " " + i +" doe not contain " + SEPARATOR);
- }
- }
- else
- {
- query = combinedQuery.substring(0, pos);
- resultName = combinedQuery.substring(pos + SEPARATOR.length()).trim();
- if(resultName.length() == 0)
- {
- _logger.error("Combined query " + combinedQuery + " " + i + " doe not contain resultName");
- }
- }
- tempResult = processor.execute(query, tupleReader);
- if(resultName.length() > 0)
- {
- tupleReader.setTempTable(resultName, tempResult);
- }
- }
-
- return tempResult;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/josql/ZNRecordJosqlFunctionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/josql/ZNRecordJosqlFunctionHandler.java b/helix-core/src/main/java/com/linkedin/helix/josql/ZNRecordJosqlFunctionHandler.java
deleted file mode 100644
index 11c7412..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/josql/ZNRecordJosqlFunctionHandler.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.josql;
-
-import java.util.Map;
-
-import org.josql.functions.AbstractFunctionHandler;
-
-import com.linkedin.helix.ZNRecord;
-
-
-public class ZNRecordJosqlFunctionHandler extends AbstractFunctionHandler
-{
- public boolean hasSimpleField(ZNRecord record, String fieldName, String field)
- {
- if(!record.getSimpleFields().containsKey(fieldName))
- {
- return false;
- }
- return field.equals(record.getSimpleField(fieldName));
- }
-
- public boolean hasListField(ZNRecord record, String fieldName, String field)
- {
- if(!record.getListFields().containsKey(fieldName))
- {
- return false;
- }
- return record.getListField(fieldName).contains(field);
- }
-
- public boolean hasMapFieldValue(ZNRecord record, String fieldName, String mapKey, String mapValue)
- {
- if(!record.getMapFields().containsKey(fieldName))
- {
- return false;
- }
- if(record.getMapField(fieldName).containsKey(mapKey))
- {
- return record.getMapField(fieldName).get(mapKey).equals(mapValue);
- }
- return false;
- }
-
- public boolean hasMapFieldKey(ZNRecord record, String fieldName, String mapKey)
- {
- if(!record.getMapFields().containsKey(fieldName))
- {
- return false;
- }
- return record.getMapField(fieldName).containsKey(mapKey);
- }
-
- public String getMapFieldValue(ZNRecord record, String fieldName, String mapKey)
- {
- if(record.getMapFields().containsKey(fieldName))
- {
- return record.getMapField(fieldName).get(mapKey);
- }
- return null;
- }
-
- public String getSimpleFieldValue(ZNRecord record, String key)
- {
- return record.getSimpleField(key);
- }
-
- public ZNRecord getZNRecordFromMap(Map<String, ZNRecord> recordMap, String key)
- {
- return recordMap.get(key);
- }
-
- public ZNRecord getZNRecordFromMap(Map<String, Map<String, ZNRecord>> recordMap, String key, String subKey)
- {
- return recordMap.get(key).get(subKey);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/josql/ZNRecordQueryProcessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/josql/ZNRecordQueryProcessor.java b/helix-core/src/main/java/com/linkedin/helix/josql/ZNRecordQueryProcessor.java
deleted file mode 100644
index 8f3b91f..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/josql/ZNRecordQueryProcessor.java
+++ /dev/null
@@ -1,1401 +0,0 @@
-package com.linkedin.helix.josql;
-
-import java.io.StringReader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-
-import com.linkedin.helix.ZNRecord;
-
-import net.sf.jsqlparser.expression.*;
-import net.sf.jsqlparser.expression.StringValue;
-import net.sf.jsqlparser.expression.operators.arithmetic.*;
-import net.sf.jsqlparser.expression.operators.conditional.*;
-import net.sf.jsqlparser.expression.operators.relational.*;
-import net.sf.jsqlparser.parser.CCJSqlParserManager;
-import net.sf.jsqlparser.schema.Column;
-import net.sf.jsqlparser.schema.Table;
-import net.sf.jsqlparser.statement.*;
-import net.sf.jsqlparser.statement.select.*;
-
-class OrderByVisitorImpl implements OrderByVisitor
-{
- boolean isAsc = true;
- Expression expression = null;
-
- @Override
- public void visit(OrderByElement orderBy)
- {
- isAsc = orderBy.isAsc();
- expression = orderBy.getExpression();
- }
-}
-
-class FromItemVisitorImpl implements FromItemVisitor
-{
- public Map<Integer, Table> tables = new HashMap<Integer, Table>();
- public Map<Integer, SelectVisitorImpl> subSelects = new HashMap<Integer, SelectVisitorImpl>();
- public List<Join> joins = new ArrayList<Join>();
- int fromItemIndex = 0;
-
- @Override
- public void visit(Table table)
- {
- tables.put(fromItemIndex++, table);
- }
-
- @Override
- public void visit(SubSelect subSelect)
- {
- SelectVisitorImpl sub = new SelectVisitorImpl();
- subSelect.getSelectBody().accept(sub);
- sub.alias = subSelect.getAlias();
- subSelects.put(fromItemIndex++, sub);
- }
-
- @Override
- public void visit(SubJoin subjoin)
- {
- throw new UnsupportedOperationException();
- }
-}
-
-class FunctionImpl
-{
- String name;
- List<ExpressionVisitorImpl> expressionVisitors = new ArrayList<ExpressionVisitorImpl>();
-}
-
-class ExpressionVisitorImpl implements ExpressionVisitor
-{
- Column column = null;
- FunctionImpl function = null;
- Object atomVal = null;
- EqualsTo equalsTo = null;
- AndExpression andExpression = null;
- OrExpression orExpression = null;
- MinorThan minorThan = null;
- GreaterThan greaterThan = null;
- Subtraction subtraction = null;
- Addition addition = null;
- Parenthesis parenthesis = null;
-
- Expression expression;
-
- ExpressionVisitorImpl(Expression expression)
- {
- this.expression = expression;
- }
-
- @Override
- public void visit(NullValue nullValue)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void visit(Function f)
- {
- function = new FunctionImpl();
- function.name = f.getName();
- ExpressionList parameters = f.getParameters();
- if(parameters != null)
- {
- @SuppressWarnings("unchecked")
- List<Expression> expressions = parameters.getExpressions();
- for(Expression e : expressions)
- {
- ExpressionVisitorImpl v = new ExpressionVisitorImpl(e);
- e.accept(v);
- function.expressionVisitors.add(v);
- }
- }
- }
-
- @Override
- public void visit(InverseExpression inverseExpression)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void visit(JdbcParameter jdbcParameter)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void visit(DoubleValue doubleValue)
- {
- atomVal = Double.valueOf(doubleValue.getValue());
- }
-
- @Override
- public void visit(LongValue longValue)
- {
- atomVal = Long.valueOf(longValue.getValue());
- }
-
- @Override
- public void visit(DateValue dateValue)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void visit(TimeValue timeValue)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void visit(TimestampValue timestampValue)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void visit(Parenthesis parenthesis)
- {
- this.parenthesis = parenthesis;
- }
-
- @Override
- public void visit(StringValue stringValue)
- {
- atomVal = stringValue.getValue();
- }
-
- @Override
- public void visit(Addition addition)
- {
- this.addition = addition;
- }
-
- @Override
- public void visit(Division division)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void visit(Multiplication multiplication)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void visit(Subtraction subtraction)
- {
- this.subtraction = subtraction;
- }
-
- @Override
- public void visit(AndExpression andExpression)
- {
- this.andExpression = andExpression;
- }
-
- @Override
- public void visit(OrExpression orExpression)
- {
- this.orExpression = orExpression;
- }
-
- @Override
- public void visit(Between between)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void visit(EqualsTo equalsTo)
- {
- this.equalsTo = equalsTo;
- }
-
- @Override
- public void visit(GreaterThan greaterThan)
- {
- this.greaterThan = greaterThan;
- }
-
- @Override
- public void visit(GreaterThanEquals greaterThanEquals)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void visit(InExpression inExpression)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void visit(IsNullExpression isNullExpression)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void visit(LikeExpression likeExpression)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void visit(MinorThan minorThan)
- {
- this.minorThan = minorThan;
- }
-
- @Override
- public void visit(MinorThanEquals minorThanEquals)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void visit(NotEqualsTo notEqualsTo)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void visit(Column tableColumn)
- {
- column = tableColumn;
- }
-
- @Override
- public void visit(SubSelect subSelect)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void visit(CaseExpression caseExpression)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void visit(WhenClause whenClause)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void visit(ExistsExpression existsExpression)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void visit(AllComparisonExpression allComparisonExpression)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void visit(AnyComparisonExpression anyComparisonExpression)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void visit(Concat concat)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void visit(Matches matches)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void visit(BitwiseAnd bitwiseAnd)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void visit(BitwiseOr bitwiseOr)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void visit(BitwiseXor bitwiseXor)
- {
- throw new UnsupportedOperationException();
- }
-}
-
-class SelectItemVisitorImpl implements SelectItemVisitor
-{
- String alias;
- boolean shouldSelectAllCols = false;
- Expression expression = null;
-
- @Override
- public void visit(AllColumns allColumns)
- {
- shouldSelectAllCols = true;
- }
-
- @Override
- public void visit(AllTableColumns allTableColumns)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void visit(SelectExpressionItem selectExpressionItem)
- {
- alias = selectExpressionItem.getAlias();
- expression = selectExpressionItem.getExpression();
- }
-}
-
-class SelectVisitorImpl implements SelectVisitor
-{
- String alias = null;
- public List<SelectItemVisitorImpl> selectItemVisitors = new ArrayList<SelectItemVisitorImpl>();
- public FromItemVisitorImpl fromItemVisitor = new FromItemVisitorImpl();
- public List<OrderByVisitorImpl> orderByVisitors = null;
- public ExpressionVisitorImpl whereVisitor = null;
- public List<Column> groupByColumnReferences = null;
- public Union union = null;
- public Limit limit = null;
-
- @SuppressWarnings("unchecked")
- @Override
- public void visit(PlainSelect plainSelect)
- {
- List<SelectItem> selectItems = plainSelect.getSelectItems();
- for(SelectItem selectItem : selectItems)
- {
- SelectItemVisitorImpl selectItemVisitor = new SelectItemVisitorImpl();
- selectItem.accept(selectItemVisitor);
- selectItemVisitors.add(selectItemVisitor);
- }
-
- plainSelect.getFromItem().accept(fromItemVisitor);
- if (plainSelect.getJoins() != null)
- {
- for (Iterator<Join> joinsIt = plainSelect.getJoins().iterator(); joinsIt.hasNext();)
- {
- Join join = joinsIt.next();
- if(join.getUsingColumns() == null || join.getUsingColumns().size() == 0)
- {
- throw new UnsupportedOperationException("'using' directive required for join");
- }
- if(join.getUsingColumns().size() != 2)
- {
- throw new UnsupportedOperationException("'using' directive for join requires exactly two columns");
- }
- join.getRightItem().accept(fromItemVisitor);
- fromItemVisitor.joins.add(join);
- }
- }
-
- Expression where = plainSelect.getWhere();
- if(where != null)
- {
- whereVisitor = new ExpressionVisitorImpl(where);
- where.accept(whereVisitor);
- }
-
- groupByColumnReferences = plainSelect.getGroupByColumnReferences();
-
- List<OrderByElement> orderByElements = plainSelect.getOrderByElements();
- if(orderByElements != null)
- {
- orderByVisitors = new ArrayList<OrderByVisitorImpl>();
- for(OrderByElement e : orderByElements)
- {
- OrderByVisitorImpl impl = new OrderByVisitorImpl();
- e.accept(impl);
- orderByVisitors.add(impl);
- }
- }
-
- limit = plainSelect.getLimit();
- }
-
- @Override
- public void visit(Union union)
- {
- this.union = union;
- }
-}
-
-public class ZNRecordQueryProcessor
-{
- public interface ZNRecordTupleReader
- {
- List<ZNRecord> get(String path) throws Exception;
- void reset();
- }
-
- private String replaceExplodeFields(String sql, String functionName)
- {
- String ret = sql;
- ret.replaceAll("`", "");
-
- Pattern pattern = Pattern.compile(functionName + "\\((.*?)\\)");
- Matcher m = pattern.matcher(ret);
- while (m.find())
- {
- String args[] = m.group(1).split(",");
- if(args.length == 0)
- {
- throw new IllegalArgumentException(functionName + " requires at least 2 parameters");
- }
-
- String str = functionName + ".";
- String s = "";
- for (int i = 0; i < args.length; i++)
- {
- s += args[i].trim().replaceAll("`", "");
- if(i != args.length-1)
- {
- s += "_____";
- }
- }
- str += "`" + s + "`";
-
- ret = ret.replaceFirst(functionName + "\\(.*?\\)", str);
- }
- return ret;
- }
-
- public List<ZNRecord> execute(String sql, ZNRecordTupleReader tupleReader) throws Exception
- {
- sql = replaceExplodeFields(sql, "explodeList");
- sql = replaceExplodeFields(sql, "explodeMap");
-
- CCJSqlParserManager pm = new CCJSqlParserManager();
- Statement statement = pm.parse(new StringReader(sql));
- if (statement instanceof Select)
- {
- Select selectStatement = (Select) statement;
- SelectVisitorImpl v = new SelectVisitorImpl();
- selectStatement.getSelectBody().accept(v);
- return executeSelect(v, tupleReader);
- }
-
- throw new UnsupportedOperationException(statement.toString());
- }
-
- private void applyLimit(Limit limit, List<ZNRecord> input)
- {
- if(limit != null)
- {
- int rowCount = (int) limit.getRowCount();
- if(input != null && rowCount < input.size())
- {
- input.subList(rowCount, input.size()).clear();
- }
- }
- }
-
- private Integer compareRecords(ZNRecord o1, ZNRecord o2, ExpressionVisitorImpl expressionVisitor)
- {
- Object val1 = evalExpression(expressionVisitor, o1);
- Object val2 = evalExpression(expressionVisitor, o2);
-
- if(val1 == null || val2 == null)
- {
- return null;
- }
-
- if(val1 instanceof Long && val2 instanceof Long)
- {
- return ((Long) val1).compareTo((Long)val2);
- }
- else if(val1 instanceof Double && val2 instanceof Double)
- {
- return ((Double)val1).compareTo((Double)val2);
- }
- else if(val1 instanceof String && val2 instanceof String)
- {
- return ((String)val1).compareTo((String) val2);
- }
-
- return null;
- }
-
- private void applyOrderBy(final List<OrderByVisitorImpl> orderByVisitors, final List<ZNRecord> result)
- {
- class RecordComparator implements Comparator<ZNRecord>
- {
- @Override
- public int compare(ZNRecord o1, ZNRecord o2)
- {
- for(OrderByVisitorImpl visitor : orderByVisitors)
- {
- ExpressionVisitorImpl expressionVisitor = new ExpressionVisitorImpl(visitor.expression);
- visitor.expression.accept(expressionVisitor);
- Integer compareResult = compareRecords(o1, o2, expressionVisitor);
-
- if(compareResult == null)
- {
- return -1; //not comparable. return arbitrary "less" value
- }
-
- if(compareResult != 0)
- {
- return (visitor.isAsc ? compareResult : -compareResult);
- }
- }
-
- return 0;
- }
- }
-
- if(orderByVisitors != null)
- {
- Collections.sort(result, new RecordComparator());
- }
- }
-
- private List<ZNRecord> executePlainSelect(SelectVisitorImpl v, ZNRecordTupleReader tupleReader) throws Exception
- {
- SourceTableList sourceTables = getSourceTables(v, tupleReader);
-
- SourceTable joinResult = getJoin(v.fromItemVisitor.joins, sourceTables, 0, sourceTables.size()-1);
- List<ZNRecord> result = joinResult.tuples;
-
- //XXX: hack needed for now to allow aliased/computed columns to be used in where and group by
- //figure out a better way
- List<ZNRecord> tempProjection = getProjection(v, result);
- if(tempProjection != null)
- {
- for(int i = 0; i < result.size(); i++)
- {
- ZNRecord record = result.get(i);
- if(record instanceof ZNRecordSet)
- {
- ((ZNRecordSet)record).addRecord("xxx_random_"+System.currentTimeMillis(), tempProjection.get(i));
- }
- else
- {
- record.merge(tempProjection.get(i));
- }
- }
- }
-
- if(v.whereVisitor != null)
- {
- result = applyWhereClause(v, result);
- }
-
- if(v.groupByColumnReferences != null)
- {
- result = applyGroupBy(v, result);
- }
- else
- {
- ZNRecord aggregate = getAggregateColumns(v, result);
- if(aggregate != null)
- {
- result = Arrays.asList(aggregate);
- }
- else
- {
- List<ZNRecord> projection = getProjection(v, result);
- if(projection != null)
- {
- result = projection;
- }
-
- applyOrderBy(v.orderByVisitors, result);
- applyLimit(v.limit, result);
- }
- }
-
- return result;
- }
-
- private SourceTable getJoin(List<Join> joins, SourceTableList sourceTables, int fromIndex, int toIndex)
- {
- assert(joins.size() == sourceTables.size()-1);
- if(fromIndex == toIndex)
- {
- return sourceTables.get(fromIndex);
- }
- else
- {
- SourceTable leftTable = sourceTables.get(fromIndex);
- List<Column> usingColumns = joins.get(fromIndex).getUsingColumns();
- Column leftCol = null;
- Column rightCol = null;
- if(usingColumns.get(0).getTable().getName().equals(leftTable.name))
- {
- leftCol = usingColumns.get(0);
- rightCol = usingColumns.get(1);
- }
- else
- {
- rightCol = usingColumns.get(0);
- leftCol = usingColumns.get(1);
- }
- SourceTable rightTable = getJoin(joins, sourceTables, fromIndex+1, toIndex);
- return getJoin(leftCol, rightCol, leftTable, rightTable);
- }
- }
-
- private SourceTable getJoin(Column leftCol,
- Column rightCol,
- SourceTable leftTable,
- SourceTable rightTable)
- {
- if(leftCol.getTable() == null || leftCol.getTable().equals(""))
- {
- throw new IllegalArgumentException(leftCol.toString() + " used in join should be fully qualified");
- }
- if(rightCol.getTable() == null || rightCol.getTable().equals(""))
- {
- throw new IllegalArgumentException(rightCol.toString() + " used in join should be fully qualified");
- }
-
-
- Map<String, Set<ZNRecord>> rightMap = new HashMap<String, Set<ZNRecord>>();
- for(ZNRecord record : rightTable.tuples)
- {
- String rightVal = getSimpleColumn(record, rightCol);
- if(rightVal != null)
- {
- if(!rightMap.containsKey(rightVal))
- {
- rightMap.put(rightVal, new HashSet<ZNRecord>());
- }
- rightMap.get(rightVal).add(record);
- }
- }
-
- List<ZNRecord> tuples = new ArrayList<ZNRecord>();
- for(ZNRecord leftRecord : leftTable.tuples)
- {
- String leftVal = getSimpleColumn(leftRecord, leftCol);
- if(leftVal != null && rightMap.containsKey(leftVal))
- {
- Set<ZNRecord> rightRecords = rightMap.get(leftVal);
- for(ZNRecord rightRecord : rightRecords)
- {
- if(rightRecord instanceof ZNRecordSet)
- {
- ((ZNRecordSet)rightRecord).addRecord(leftTable.name, leftRecord);
- tuples.add(rightRecord);
- }
- else
- {
- ZNRecordSet recordSet = new ZNRecordSet("");
- recordSet.addRecord(rightTable.name, rightRecord);
- recordSet.addRecord(leftTable.name, leftRecord);
- tuples.add(recordSet);
- }
- }
- }
- }
-
- return new SourceTable("xxx_random_" + System.currentTimeMillis(), 0, tuples);
- }
-
- private Integer evalComparisonExpression(BinaryExpression expression, ZNRecord record)
- {
- Object leftVal = evalExpression(expression.getLeftExpression(), record);
- Object rightVal = evalExpression(expression.getRightExpression(), record);
- if (leftVal != null && rightVal != null)
- {
- if (leftVal instanceof String && rightVal instanceof String)
- {
- return ((String) leftVal).compareTo((String) rightVal);
- }
- else if (leftVal instanceof Long || rightVal instanceof Long ||
- leftVal instanceof Double || rightVal instanceof Double)
- {
- Object l = toNum(leftVal);
- Object r = toNum(rightVal);
- if(l instanceof Long && r instanceof Long)
- {
- return ((Long) l).compareTo((Long) r);
- }
- else if (l instanceof Double && l instanceof Double)
- {
- return ((Double) l).compareTo((Double) r);
- }
- }
- else
- {
- throw new IllegalArgumentException("Left and right parameters of comparison cannot be compared");
- }
- }
-
- return null;
- }
-
- private Object evalExpression(ExpressionVisitorImpl visitor, ZNRecord record)
- {
- if(visitor.column != null)
- {
- return getSimpleColumn(record, visitor.column);
- }
- else if(visitor.function != null)
- {
- return evalFunction(record, visitor.function);
- }
- else if(visitor.atomVal != null)
- {
- return visitor.atomVal;
- }
- else if(visitor.equalsTo != null)
- {
- Integer ret = evalComparisonExpression(visitor.equalsTo, record);
- return Boolean.valueOf(ret != null && ret == 0);
- }
- else if(visitor.minorThan != null)
- {
- Integer ret = evalComparisonExpression(visitor.minorThan, record);
- return Boolean.valueOf(ret != null && ret < 0);
- }
- else if(visitor.greaterThan != null)
- {
- Integer ret = evalComparisonExpression(visitor.greaterThan, record);
- return Boolean.valueOf(ret != null && ret > 0);
- }
- else if(visitor.subtraction != null || visitor.addition != null)
- {
- Expression leftExpression = (visitor.subtraction != null ? visitor.subtraction.getLeftExpression() : visitor.addition.getLeftExpression());
- Expression rightExpression = (visitor.subtraction != null ? visitor.subtraction.getRightExpression() : visitor.addition.getRightExpression());
- Object leftVal = toNum(evalExpression(leftExpression, record));
- Object rightVal = toNum(evalExpression(rightExpression, record));
- if(leftVal != null && rightVal != null)
- {
- int multiplier = (visitor.subtraction != null ? -1 : 1);
- if (leftVal instanceof Long && rightVal instanceof Long)
- {
- return Long.valueOf((Long) leftVal + (multiplier * (Long) rightVal));
- }
- else if (leftVal instanceof Double && rightVal instanceof Double)
- {
- return Double.valueOf((Double) leftVal + (multiplier * (Double) rightVal));
- }
- }
- }
- else if(visitor.andExpression != null)
- {
- Object leftVal = evalExpression(visitor.andExpression.getLeftExpression(), record);
- Object rightVal = evalExpression(visitor.andExpression.getRightExpression(), record);
- if(leftVal != null && rightVal != null)
- {
- if(!(leftVal instanceof Boolean) || !(rightVal instanceof Boolean))
- {
- throw new IllegalArgumentException("AND clause parameters don't evaluate to boolean");
- }
- return Boolean.valueOf(((Boolean)leftVal) && ((Boolean)rightVal));
- }
- else
- {
- return Boolean.FALSE;
- }
- }
- else if(visitor.parenthesis != null)
- {
- return evalExpression(visitor.parenthesis.getExpression(), record);
- }
-
- return null;
- }
-
- private Object evalExpression(Expression exp, ZNRecord record)
- {
- //TODO: Replace logic here with generic expression evaluation
- ExpressionVisitorImpl visitor = new ExpressionVisitorImpl(exp);
- exp.accept(visitor);
- return evalExpression(visitor, record);
- }
-
- private List<ZNRecord> applyWhereClause(SelectVisitorImpl v,
- List<ZNRecord> input)
- {
- List<ZNRecord> result = new ArrayList<ZNRecord>();
- for (ZNRecord record : input)
- {
- Object evalResult = evalExpression(v.whereVisitor, record);
- if(evalResult == null || !(evalResult instanceof Boolean))
- {
- throw new UnsupportedOperationException("Unsupported predicate in where clause:" + v.whereVisitor.expression);
- }
- else if((Boolean)evalResult)
- {
- result.add(record);
- }
- }
-
- return result;
- }
-
- private List<ZNRecord> executeUnion(SelectVisitorImpl v, ZNRecordTupleReader tupleReader) throws Exception
- {
- @SuppressWarnings("unchecked")
- List<PlainSelect> plainSelects = v.union.getPlainSelects();
- List<ZNRecord> result = new ArrayList<ZNRecord>();
- for(PlainSelect select : plainSelects)
- {
- SelectVisitorImpl visitor = new SelectVisitorImpl();
- select.accept(visitor);
- result.addAll(executePlainSelect(visitor, tupleReader));
- }
-
- applyLimit(v.limit, result);
- return result;
- }
-
- private List<ZNRecord> executeSelect(SelectVisitorImpl v, ZNRecordTupleReader tupleReader) throws Exception
- {
- if(v.union != null)
- {
- return executeUnion(v, tupleReader);
- }
- else
- {
- return executePlainSelect(v, tupleReader);
- }
- }
-
- private String getSimpleColumn(ZNRecord record, Column column)
- {
- if(record == null)
- {
- return null;
- }
-
- String ret = record.getSimpleField(column.getWholeColumnName());
- if(ret == null)
- {
- ret = record.getSimpleField(column.getColumnName());
- }
- if(ret == null)
- {
- Map<String, String> simpleFields = record.getSimpleFields();
- for(String key : simpleFields.keySet())
- {
- if(key.endsWith("." + column.getColumnName()))
- {
- ret = simpleFields.get(key);
- }
- }
- }
- return ret;
- }
-
- private Map<Map<Column, String>, List<ZNRecord>> formGroups(List<Column> groupByCols, List<ZNRecord> input)
- {
- Map<Map<Column, String>, List<ZNRecord>> map = new HashMap<Map<Column,String>, List<ZNRecord>>();
- for(ZNRecord record : input)
- {
- Map<Column, String> group = new HashMap<Column, String>();
- for(Column col : groupByCols)
- {
- String val = getSimpleColumn(record, col);
- if(val != null)
- {
- group.put(col, val);
- }
- }
- if(group.size() != 0)
- {
- if(!map.containsKey(group))
- {
- map.put(group, new ArrayList<ZNRecord>());
- }
- map.get(group).add(record);
- }
- }
- return map;
- }
-
- //TODO: Validate that only group by keys and aggregate functions are in select items
- private List<ZNRecord> applyGroupBy(SelectVisitorImpl v, List<ZNRecord> input) throws Exception
- {
- List<ZNRecord> result = new ArrayList<ZNRecord>();
- Map<Map<Column, String>, List<ZNRecord>> groups = formGroups(v.groupByColumnReferences, input);
- for(Map<Column, String> group : groups.keySet())
- {
- List<ZNRecord> tuples = groups.get(group);
- ZNRecord aggregate = getAggregateColumns(v, tuples);
- if(aggregate != null)
- {
- for(Column col : group.keySet())
- {
- String val = group.get(col);
- aggregate.setSimpleField(col.getWholeColumnName(), val);
- }
- result.add(aggregate);
- }
- }
-
- return result;
- }
-
- //TODO: Generalize this
- private ZNRecord getAggregateColumns(SelectVisitorImpl v, List<ZNRecord> input) throws Exception
- {
- for(SelectItemVisitorImpl impl : v.selectItemVisitors)
- {
- if(impl.expression != null)
- {
- ExpressionVisitorImpl expressionVisitor = new ExpressionVisitorImpl(impl.expression);
- impl.expression.accept(expressionVisitor);
- if(expressionVisitor.function != null && (expressionVisitor.function.name.equalsIgnoreCase("max")
- || expressionVisitor.function.name.equalsIgnoreCase("min")))
- {
- if (expressionVisitor.function.expressionVisitors.size() != 1)
- {
- throw new IllegalArgumentException(expressionVisitor.function.name + " needs one argument");
- }
-
- ZNRecord min = null;
- ZNRecord max = null;
- for(ZNRecord record : input)
- {
- if(min == null || compareRecords(record, min, expressionVisitor.function.expressionVisitors.get(0)) < 0)
- {
- min = record;
- }
- if(max == null || compareRecords(record, max, expressionVisitor.function.expressionVisitors.get(0)) > 0)
- {
- max = record;
- }
- }
-
- Object val = null;
- if(expressionVisitor.function.name.equalsIgnoreCase("max"))
- {
- val = evalExpression(expressionVisitor.function.expressionVisitors.get(0), max);
- }
- else
- {
- val = evalExpression(expressionVisitor.function.expressionVisitors.get(0), min);
- }
- ZNRecord ret = new ZNRecord("");
- String columnName = (impl.alias != null ? impl.alias : expressionVisitor.function.name);
- if(val != null)
- {
- ret.setSimpleField(columnName, ""+val);
- }
- return ret;
- }
- }
- }
- return null;
- }
-
- //return null if all columns are projected
- private List<ZNRecord> getProjection(SelectVisitorImpl v, List<ZNRecord> input) throws Exception
- {
- List<ZNRecord> output = new ArrayList<ZNRecord>();
-
- for (ZNRecord record : input)
- {
- ZNRecord projection = new ZNRecord("");
- for(SelectItemVisitorImpl impl : v.selectItemVisitors)
- {
- if(impl.shouldSelectAllCols)
- {
- return null;
- }
- else if(impl.expression != null)
- {
- ExpressionVisitorImpl expressionVisitor = new ExpressionVisitorImpl(impl.expression);
- impl.expression.accept(expressionVisitor);
- String columnName = impl.alias;
- Object val = evalExpression(expressionVisitor, record);
- if(expressionVisitor.column != null && columnName == null)
- {
- columnName = expressionVisitor.column.getWholeColumnName();
- }
- else if(expressionVisitor.function != null && columnName == null)
- {
- columnName = expressionVisitor.function.name;
- }
- else if(columnName == null)
- {
- columnName = impl.expression.toString();
- }
- if(val != null)
- {
- projection.setSimpleField(columnName, val.toString());
- }
- }
- }
-
- output.add(projection);
- }
- return output;
- }
-
- private Object getParamValue(FunctionImpl function, int index, ZNRecord record)
- {
- if(function.expressionVisitors.get(index).function != null)
- {
- return evalFunction(record, function.expressionVisitors.get(index).function);
- }
- else if(function.expressionVisitors.get(index).column != null)
- {
- return getSimpleColumn(record, function.expressionVisitors.get(index).column);
- }
- else
- {
- return function.expressionVisitors.get(index).atomVal;
- }
- }
-
- private Object evalFunction(ZNRecord record, FunctionImpl function)
- {
- if(function.name.equalsIgnoreCase("concat"))
- {
- if (function.expressionVisitors.size() != 2)
- {
- throw new IllegalArgumentException("concat() needs 2 arguments");
- }
- Object arg1 = getParamValue(function, 0, record);
- Object arg2 = getParamValue(function, 1, record);
- String ret = (arg1 != null ? arg1.toString() : "");
- ret += (arg2 != null ? arg2.toString() : "");
- return ret;
- }
- else if(function.name.equalsIgnoreCase("to_number"))
- {
- if (function.expressionVisitors.size() != 1)
- {
- throw new IllegalArgumentException("to_number() needs 1 argument");
- }
- Object param = getParamValue(function, 0, record);
- return toNum(param);
- }
-
- return null;
- }
-
- private Object toNum(Object o)
- {
- if(o == null)
- {
- return null;
- }
-
- if(o instanceof String)
- {
- String s = (String) o;
- if(s.contains("."))
- {
- try {return Double.parseDouble(s);}catch(Exception e){}
- }
- else
- {
- try {return Long.parseLong(s);}catch(Exception e){}
- }
- }
- else if(o instanceof Long || o instanceof Double)
- {
- return o;
- }
-
- return null;
- }
-
- private SourceTableList getSourceTables(SelectVisitorImpl v, ZNRecordTupleReader tupleReader) throws Exception
- {
- SourceTableList sourceTables = new SourceTableList();
-
- for(int fromItemIndex : v.fromItemVisitor.tables.keySet())
- {
- Table t = v.fromItemVisitor.tables.get(fromItemIndex);
- String schema = t.getSchemaName();
- if(schema != null)
- {
- schema = schema.replaceAll("`", "");
- }
- String tableName = t.getName();
- tableName = tableName.replaceAll("`", "");
- List<ZNRecord> tuples = null;
- String key = null;
- if(schema != null && (schema.equalsIgnoreCase("explodeList") || schema.equalsIgnoreCase("explodeMap")))
- {
- tuples = getExplodedTable(schema, tableName, tupleReader);
- key = (t.getAlias() != null ? t.getAlias() : "xxx_random_" + System.currentTimeMillis());
- }
- else
- {
- tuples = tupleReader.get(tableName);
- key = (t.getAlias() != null ? t.getAlias() : tableName);
- }
- sourceTables.add(new SourceTable(key, fromItemIndex, tuples));
- }
-
- for(int fromItemIndex : v.fromItemVisitor.subSelects.keySet())
- {
- SelectVisitorImpl subSelect = v.fromItemVisitor.subSelects.get(fromItemIndex);
- List<ZNRecord> tuples = executeSelect(subSelect, tupleReader);
- String key = (subSelect.alias != null ? subSelect.alias : "xxx_random_" + System.currentTimeMillis());
- sourceTables.add(new SourceTable(key, fromItemIndex, tuples));
- }
-
- //XXX: Special case of ID column
- for(SourceTable table : sourceTables)
- {
- for(ZNRecord record : table.tuples)
- {
- if(!record.getSimpleFields().containsKey("id"))
- {
- record.setSimpleField("id", record.getId());
- }
- }
- }
-
- return sourceTables;
- }
-
- private List<ZNRecord> getExplodedTable(String function,
- String arg,
- ZNRecordTupleReader tupleReader) throws Exception
- {
- List<String> args = Arrays.asList(arg.split("_____"));
- String originalTable = args.get(0);
- List<String> columns = (args.size() > 1 ? args.subList(1, args.size()) : null);
- if(function.equalsIgnoreCase("explodeList"))
- {
- return explodeListFields(originalTable, columns, tupleReader);
- }
- else if(function.equalsIgnoreCase("explodeMap"))
- {
- return explodeMapFields(originalTable, columns, tupleReader);
- }
-
- throw new IllegalArgumentException(function + " is not supported");
- }
-
- private List<ZNRecord> explodeMapFields(String tableName,
- Collection<String> mapColumns,
- ZNRecordTupleReader tupleReader) throws Exception
- {
- List<ZNRecord> table = tupleReader.get(tableName);
- List<ZNRecord> ret = new ArrayList<ZNRecord>();
- for(ZNRecord record : table)
- {
- Collection<String> cols = (mapColumns == null ? record.getMapFields().keySet() : mapColumns);
-
- for(String mapCol : cols) //for each map field
- {
- if(record.getMapFields().containsKey(mapCol))
- {
- ZNRecord newRecord = new ZNRecord("");
- newRecord.getSimpleFields().putAll(record.getSimpleFields());
- newRecord.setSimpleField("mapField", mapCol);
- Map<String, String> mapField = record.getMapField(mapCol);
- for(String key : mapField.keySet())
- {
- newRecord.setSimpleField(key, mapField.get(key));
- }
- ret.add(newRecord);
- }
- }
- }
-
- return ret;
- }
-
- private List<ZNRecord> explodeListFields(String tableName,
- Collection<String> listColumns,
- ZNRecordTupleReader tupleReader) throws Exception
- {
- List<ZNRecord> table = tupleReader.get(tableName);
- List<ZNRecord> ret = new ArrayList<ZNRecord>();
- for(ZNRecord record : table)
- {
- Collection<String> cols = (listColumns == null ? record.getListFields().keySet() : listColumns);
- for(String listCol : cols) //for each list field
- {
- if(record.getListFields().containsKey(listCol))
- {
- List<String> listField = record.getListField(listCol);
- for(int listIndex = 0; listIndex < listField.size(); listIndex++)
- {
- String val = listField.get(listIndex);
- ZNRecord newRecord = new ZNRecord("");
- newRecord.getSimpleFields().putAll(record.getSimpleFields());
- newRecord.setSimpleField("listField", listCol);
- newRecord.setSimpleField("listIndex", ""+listIndex);
- newRecord.setSimpleField("listVal", val);
- ret.add(newRecord);
- }
- }
- }
- }
- return ret;
- }
-}
-
-class ZNRecordSet extends ZNRecord
-{
- private HashMap<String, ZNRecord> _records = new HashMap<String, ZNRecord>();
-
- public ZNRecordSet(String id)
- {
- super(id);
- // TODO Auto-generated constructor stub
- }
-
- public void addRecord(String tableName, ZNRecord record)
- {
- _records.put(tableName, record);
- }
-
- @Override
- public String getSimpleField(String key)
- {
- if(key.contains("."))
- {
- String tmp[] = key.split("\\.");
- if(_records.containsKey(tmp[0]))
- {
- return _records.get(tmp[0]).getSimpleField(tmp[1]);
- }
- }
- else
- {
- for(ZNRecord record : _records.values())
- {
- if(record.getSimpleFields().containsKey(key))
- {
- return record.getSimpleField(key);
- }
- }
- }
-
- return null;
- }
-
- @Override
- public void setSimpleField(String key, String val)
- {
- if(key.contains("."))
- {
- String tmp[] = key.split("\\.");
- if(_records.containsKey(tmp[0]))
- {
- _records.get(tmp[0]).setSimpleField(tmp[1], val);
- }
- else
- {
- ZNRecord record = new ZNRecord("");
- record.setSimpleField(tmp[1], val);
- addRecord(tmp[0], record);
- }
- }
- else
- {
- if(_records.size() == 0)
- {
- ZNRecord record = new ZNRecord("");
- record.setSimpleField(key, val);
- addRecord("xxx_random_" + System.currentTimeMillis(), record);
- }
- else
- {
- _records.get(_records.keySet().iterator().next()).setSimpleField(key, val);
- }
- }
- }
-
- @Override
- public String toString()
- {
- return "@" + _records.toString() + "@";
- }
-}
-
-class SourceTableList implements Iterable<SourceTable>
-{
- Map<String, SourceTable> _nameMap = new HashMap<String, SourceTable>();
- Map<Integer, SourceTable> _indexMap = new HashMap<Integer, SourceTable>();
-
- public void add(SourceTable table)
- {
- _nameMap.put(table.name, table);
- _indexMap.put(table.fromItemIndex, table);
- }
-
- public int size()
- {
- return _nameMap.size();
- }
-
- public SourceTable get(int index)
- {
- if(index >= _indexMap.size())
- {
- throw new IndexOutOfBoundsException();
- }
- return _indexMap.get(index);
- }
-
- public SourceTable get(String name)
- {
- return _nameMap.get(name);
- }
-
- public boolean containsKey(String name)
- {
- return _nameMap.containsKey(name);
- }
-
- public Iterator<SourceTable> iterator()
- {
- return _nameMap.values().iterator();
- }
-
- @Override
- public String toString()
- {
- return _nameMap.toString();
- }
-}
-
-class SourceTable
-{
- String name;
- int fromItemIndex;
- List<ZNRecord> tuples;
-
- public SourceTable(String name, int fromItemIndex, List<ZNRecord> tuples)
- {
- this.name = name;
- this.fromItemIndex = fromItemIndex;
- this.tuples = tuples;
- }
-
- @Override
- public String toString()
- {
- return name + "->" + tuples;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/josql/ZNRecordRow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/josql/ZNRecordRow.java b/helix-core/src/main/java/com/linkedin/helix/josql/ZNRecordRow.java
deleted file mode 100644
index 08a3022..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/josql/ZNRecordRow.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.josql;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import com.linkedin.helix.ZNRecord;
-
-/**
- * A Normalized form of ZNRecord
- * */
-public class ZNRecordRow
-{
- // "Field names" in the flattened ZNRecord
- public static final String SIMPLE_KEY = "simpleKey";
- public static final String SIMPLE_VALUE = "simpleValue";
-
- public static final String LIST_KEY = "listKey";
- public static final String LIST_VALUE = "listValue";
- public static final String LIST_VALUE_INDEX = "listValueIndex";
-
- public static final String MAP_KEY = "mapKey";
- public static final String MAP_SUBKEY = "mapSubKey";
- public static final String MAP_VALUE = "mapValue";
- public static final String ZNRECORD_ID = "recordId";
- // ZNRECORD path ?
-
-
- final Map<String, String> _rowDataMap = new HashMap<String, String>();
-
- public ZNRecordRow()
- {
- _rowDataMap.put(SIMPLE_KEY, "");
- _rowDataMap.put(SIMPLE_VALUE, "");
- _rowDataMap.put(LIST_KEY, "");
- _rowDataMap.put(LIST_VALUE, "");
- _rowDataMap.put(LIST_VALUE_INDEX, "");
- _rowDataMap.put(MAP_KEY, "");
- _rowDataMap.put(MAP_SUBKEY, "");
- _rowDataMap.put(MAP_VALUE, "");
- _rowDataMap.put(ZNRECORD_ID, "");
- }
-
- public String getField(String rowField)
- {
- return _rowDataMap.get(rowField);
- }
-
- public void putField(String fieldName, String fieldValue)
- {
- _rowDataMap.put(fieldName, fieldValue);
- }
- public String getListValueIndex()
- {
- return getField(LIST_VALUE_INDEX);
- }
- public String getSimpleKey()
- {
- return getField(SIMPLE_KEY);
- }
-
- public String getSimpleValue()
- {
- return getField(SIMPLE_VALUE);
- }
-
- public String getListKey()
- {
- return getField(LIST_KEY);
- }
-
- public String getListValue()
- {
- return getField(LIST_VALUE);
- }
-
- public String getMapKey()
- {
- return getField(MAP_KEY);
- }
-
- public String getMapSubKey()
- {
- return getField(MAP_SUBKEY);
- }
-
- public String getMapValue()
- {
- return getField(MAP_VALUE);
- }
-
- public String getRecordId()
- {
- return getField(ZNRECORD_ID);
- }
-
- /* Josql function handlers */
- public static String getField(ZNRecordRow row, String rowField)
- {
- return row.getField(rowField);
- }
-
- public static List<ZNRecordRow> convertSimpleFields(ZNRecord record)
- {
- List<ZNRecordRow> result = new ArrayList<ZNRecordRow>();
- for(String key : record.getSimpleFields().keySet())
- {
- ZNRecordRow row = new ZNRecordRow();
- row.putField(ZNRECORD_ID, record.getId());
- row.putField(SIMPLE_KEY, key);
- row.putField(SIMPLE_VALUE, record.getSimpleField(key));
- result.add(row);
- }
- return result;
- }
-
- public static List<ZNRecordRow> convertListFields(ZNRecord record)
- {
- List<ZNRecordRow> result = new ArrayList<ZNRecordRow>();
- for(String key : record.getListFields().keySet())
- {
- int order = 0;
- for(String value : record.getListField(key))
- {
- ZNRecordRow row = new ZNRecordRow();
- row.putField(ZNRECORD_ID, record.getId());
- row.putField(LIST_KEY, key);
- row.putField(LIST_VALUE, record.getSimpleField(key));
- row.putField(LIST_VALUE_INDEX, ""+order);
- order++;
- result.add(row);
- }
- }
- return result;
- }
-
- public static List<ZNRecordRow> convertMapFields(ZNRecord record)
- {
- List<ZNRecordRow> result = new ArrayList<ZNRecordRow>();
- for(String key0 : record.getMapFields().keySet())
- {
- for(String key1 : record.getMapField(key0).keySet())
- {
- ZNRecordRow row = new ZNRecordRow();
- row.putField(ZNRECORD_ID, record.getId());
- row.putField(MAP_KEY, key0);
- row.putField(MAP_SUBKEY, key1);
- row.putField(MAP_VALUE, record.getMapField(key0).get(key1));
- result.add(row);
- }
- }
- return result;
- }
-
- public static List<ZNRecordRow> flatten(ZNRecord record)
- {
- List<ZNRecordRow> result = convertMapFields(record);
- result.addAll(convertListFields(record));
- result.addAll(convertSimpleFields(record));
- return result;
- }
-
- public static List<ZNRecordRow> flatten(Collection<ZNRecord> recordList)
- {
- List<ZNRecordRow> result = new ArrayList<ZNRecordRow>();
- for(ZNRecord record : recordList)
- {
- result.addAll(flatten(record));
- }
- return result;
- }
-
- public static List<ZNRecordRow> getRowListFromMap(Map<String, List<ZNRecordRow>> rowMap, String key)
- {
- return rowMap.get(key);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/josql/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/josql/package-info.java b/helix-core/src/main/java/com/linkedin/helix/josql/package-info.java
deleted file mode 100644
index 7b4e85d..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/josql/package-info.java
+++ /dev/null
@@ -1,5 +0,0 @@
-/**
- * Jsql processor for Helix
- *
- */
-package com.linkedin.helix.josql;
\ No newline at end of file