You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 00:26:40 UTC

[13/47] Refactoring from com.linkedin.helix to org.apache.helix

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/healthcheck/StatHealthReportProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/healthcheck/StatHealthReportProvider.java b/helix-core/src/main/java/com/linkedin/helix/healthcheck/StatHealthReportProvider.java
deleted file mode 100644
index 720eea5..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/healthcheck/StatHealthReportProvider.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.healthcheck;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.log4j.Logger;
-
-public class StatHealthReportProvider extends HealthReportProvider
-{
-
-  private static final Logger _logger = Logger
-      .getLogger(StatHealthReportProvider.class);
-
-  /*
-   * public final static String _testStat = "testStat"; public final static
-   * String _readLatencyStat = "readLatencyStat"; public final static String
-   * _requestCountStat = "requestCountStat"; public final static String
-   * _partitionRequestCountStat = "partitionRequestCountStat";
-   */
-
-  public static final String REPORT_NAME = "ParticipantStats";
-  public String _reportName = REPORT_NAME;
-
-  public static final String STAT_VALUE = "value";
-  public static final String TIMESTAMP = "timestamp";
-
-  public int readLatencyCount = 0;
-  public double readLatencySum = 0;
-
-  public int requestCount = 0;
-
-  // private final Map<String, String> _partitionCountsMap = new HashMap<String,
-  // String>();
-
-  // private final Map<String, HashMap<String,String>> _partitionStatMaps = new
-  // HashMap<String, HashMap<String,String>>();
-  private final ConcurrentHashMap<String, String> _statsToValues = new ConcurrentHashMap<String, String>();
-  private final ConcurrentHashMap<String, String> _statsToTimestamps = new ConcurrentHashMap<String, String>();
-
-  public StatHealthReportProvider()
-  {
-  }
-
-  @Override
-  public Map<String, String> getRecentHealthReport()
-  {
-    return null;
-  }
-
-  // TODO: function is misnamed, but return type is what I want
-  @Override
-  public Map<String, Map<String, String>> getRecentPartitionHealthReport()
-  {
-    Map<String, Map<String, String>> result = new HashMap<String, Map<String, String>>();
-    for (String stat : _statsToValues.keySet())
-    {
-      Map<String, String> currStat = new HashMap<String, String>();
-      /*
-       * currStat.put(Stat.OP_TYPE, stat._opType);
-       * currStat.put(Stat.MEASUREMENT_TYPE, stat._measurementType);
-       * currStat.put(Stat.NODE_NAME, stat._nodeName);
-       * currStat.put(Stat.PARTITION_NAME, stat._partitionName);
-       * currStat.put(Stat.RESOURCE_NAME, stat._resourceName);
-       * currStat.put(Stat.RETURN_STATUS, stat._returnStatus);
-       * currStat.put(Stat.METRIC_NAME, stat._metricName);
-       * currStat.put(Stat.AGG_TYPE, stat._aggTypeName);
-       */
-      currStat.put(TIMESTAMP, _statsToTimestamps.get(stat));
-      currStat.put(STAT_VALUE, _statsToValues.get(stat));
-      result.put(stat, currStat);
-    }
-    return result;
-  }
-
-  public boolean contains(Stat inStat)
-  {
-    return _statsToValues.containsKey(inStat);
-  }
-
-  public Set<String> keySet()
-  {
-    return _statsToValues.keySet();
-  }
-
-  public String getStatValue(Stat inStat)
-  {
-    return _statsToValues.get(inStat);
-  }
-
-  public long getStatTimestamp(Stat inStat)
-  {
-    return Long.parseLong(_statsToTimestamps.get(inStat));
-  }
-
-  /*
-   * public String getStatValue(String opType, String measurementType, String
-   * resourceName, String partitionName, String nodeName, boolean
-   * createIfMissing) { Stat rs = new Stat(opType, measurementType,
-   * resourceName, partitionName, nodeName); String val =
-   * _statsToValues.get(rs); if (val == null && createIfMissing) { val = "0";
-   * _statsToValues.put(rs, val); } return val; }
-   */
-
-  public void writeStat(String statName, String val, String timestamp)
-  {
-    _statsToValues.put(statName, val);
-    _statsToTimestamps.put(statName, timestamp);
-  }
-
-  /*
-   * public void setStat(Stat es, String val, String timestamp) { writeStat(es,
-   * val, timestamp); }
-   * 
-   * public void setStat(String opType, String measurementType, String
-   * resourceName, String partitionName, String nodeName, double val, String
-   * timestamp) { Stat rs = new Stat(opType, measurementType, resourceName,
-   * partitionName, nodeName); writeStat(rs, String.valueOf(val), timestamp); }
-   */
-
-  public void incrementStat(String statName, String timestamp)
-  {
-    // Stat rs = new Stat(opType, measurementType, resourceName, partitionName,
-    // nodeName);
-    String val = _statsToValues.get(statName);
-    if (val == null)
-    {
-      val = "0";
-    }
-    else
-    {
-      val = String.valueOf(Double.parseDouble(val) + 1);
-    }
-    writeStat(statName, val, timestamp);
-  }
-
-  public int size()
-  {
-    return _statsToValues.size();
-  }
-
-  public void resetStats()
-  {
-    _statsToValues.clear();
-    _statsToTimestamps.clear();
-  }
-
-  public void setReportName(String name)
-  {
-    _reportName = name;
-  }
-
-  public String getReportName()
-  {
-    return _reportName;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/healthcheck/WindowAggregationType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/healthcheck/WindowAggregationType.java b/helix-core/src/main/java/com/linkedin/helix/healthcheck/WindowAggregationType.java
deleted file mode 100644
index 0bb488a..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/healthcheck/WindowAggregationType.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.healthcheck;
-
-import java.util.TimerTask;
-
-import org.apache.log4j.Logger;
-
-public class WindowAggregationType implements AggregationType
-{
-
-  private static final Logger logger = Logger
-      .getLogger(WindowAggregationType.class);
-
-  public final String WINDOW_DELIM = "#";
-
-  public final static String TYPE_NAME = "window";
-
-  int _windowSize = 1;
-
-  public WindowAggregationType(int ws)
-  {
-    super();
-    _windowSize = ws;
-  }
-
-  @Override
-  public String getName()
-  {
-    StringBuilder sb = new StringBuilder();
-    sb.append(TYPE_NAME);
-    sb.append(DELIM);
-    sb.append(_windowSize);
-    return sb.toString();
-  }
-
-  @Override
-  public String merge(String incomingVal, String existingVal, long prevTimestamp)
-  {
-    String[] windowVals;
-    if (existingVal == null)
-    {
-      return incomingVal;
-    }
-    else
-    {
-      windowVals = existingVal.split(WINDOW_DELIM);
-      int currLength = windowVals.length;
-      // window not full
-      if (currLength < _windowSize)
-      {
-        return existingVal + WINDOW_DELIM + incomingVal;
-      }
-      // evict oldest
-      else
-      {
-        int firstDelim = existingVal.indexOf(WINDOW_DELIM);
-        return existingVal.substring(firstDelim + 1) + WINDOW_DELIM
-            + incomingVal;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/healthcheck/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/healthcheck/package-info.java b/helix-core/src/main/java/com/linkedin/helix/healthcheck/package-info.java
deleted file mode 100644
index dd64f06..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/healthcheck/package-info.java
+++ /dev/null
@@ -1,5 +0,0 @@
-/**
- * Helix health check classes
- * 
- */
-package com.linkedin.helix.healthcheck;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/josql/ClusterJosqlQueryProcessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/josql/ClusterJosqlQueryProcessor.java b/helix-core/src/main/java/com/linkedin/helix/josql/ClusterJosqlQueryProcessor.java
deleted file mode 100644
index d96598a..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/josql/ClusterJosqlQueryProcessor.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.josql;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-import org.josql.Query;
-import org.josql.QueryExecutionException;
-import org.josql.QueryParseException;
-import org.josql.QueryResults;
-
-import com.linkedin.helix.ConfigScope.ConfigScopeProperty;
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.HelixProperty;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.model.LiveInstance.LiveInstanceProperty;
-
-public class ClusterJosqlQueryProcessor
-{
-  public static final String PARTITIONS = "PARTITIONS";
-  public static final String FLATTABLE = ".Table";
-
-  HelixManager _manager;
-  private static Logger _logger = Logger.getLogger(ClusterJosqlQueryProcessor.class);
-
-  public ClusterJosqlQueryProcessor(HelixManager manager)
-  {
-    _manager = manager;
-  }
-
-  String parseFromTarget(String sql)
-  {
-    // We need to find out the "FROM" target, and replace it with liveInstances
-    // / partitions etc
-    int fromIndex = sql.indexOf("FROM");
-    if (fromIndex == -1)
-    {
-      throw new HelixException("Query must contain FROM target. Query: " + sql);
-    }
-    // Per JoSql, select FROM <target> the target must be a object class that
-    // corresponds to a "table row"
-    // In out case, the row is always a ZNRecord
-
-    int nextSpace = sql.indexOf(" ", fromIndex);
-    while (sql.charAt(nextSpace) == ' ')
-    {
-      nextSpace++;
-    }
-    int nextnextSpace = sql.indexOf(" ", nextSpace);
-    if (nextnextSpace == -1)
-    {
-      nextnextSpace = sql.length();
-    }
-    String fromTarget = sql.substring(nextSpace, nextnextSpace).trim();
-
-    if (fromTarget.length() == 0)
-    {
-      throw new HelixException("FROM target in the query cannot be empty. Query: " + sql);
-    }
-    return fromTarget;
-  }
-
-  public List<Object> runJoSqlQuery(String josql, Map<String, Object> bindVariables,
-      List<Object> additionalFunctionHandlers, List queryTarget) throws QueryParseException,
-      QueryExecutionException
-  {
-    Query josqlQuery = prepareQuery(bindVariables, additionalFunctionHandlers);
-
-    josqlQuery.parse(josql);
-    QueryResults qr = josqlQuery.execute(queryTarget);
-
-    return qr.getResults();
-  }
-
-  Query prepareQuery(Map<String, Object> bindVariables, List<Object> additionalFunctionHandlers)
-  {
-    // DataAccessor accessor = _manager.getDataAccessor();
-    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-    
-    // Get all the ZNRecords in the cluster and set them as bind variables
-    Builder keyBuilder = accessor.keyBuilder();
-//    List<ZNRecord> instanceConfigs = accessor.getChildValues(PropertyType.CONFIGS,
-//        ConfigScopeProperty.PARTICIPANT.toString());
-    
-    List<ZNRecord> instanceConfigs = HelixProperty.convertToList(accessor.getChildValues(keyBuilder.instanceConfigs()));
-
-    List<ZNRecord> liveInstances = HelixProperty.convertToList(accessor.getChildValues(keyBuilder.liveInstances()));
-    List<ZNRecord> stateModelDefs = HelixProperty.convertToList(accessor.getChildValues(keyBuilder.stateModelDefs()));
-    
-    // Idealstates are stored in a map from resource name to idealState ZNRecord
-    List<ZNRecord> idealStateList = HelixProperty.convertToList(accessor.getChildValues(keyBuilder.idealStates()));
-    
-    Map<String, ZNRecord> idealStatesMap = new HashMap<String, ZNRecord>();
-    for (ZNRecord idealState : idealStateList)
-    {
-      idealStatesMap.put(idealState.getId(), idealState);
-    }
-    // Make up the partition list: for selecting partitions
-    List<ZNRecord> partitions = new ArrayList<ZNRecord>();
-    for (ZNRecord idealState : idealStateList)
-    {
-      for (String partitionName : idealState.getMapFields().keySet())
-      {
-        partitions.add(new ZNRecord(partitionName));
-      }
-    }
-    
-    List<ZNRecord> externalViewList = HelixProperty.convertToList(accessor.getChildValues(keyBuilder.externalViews()));
-    // ExternalViews are stored in a map from resource name to idealState
-    // ZNRecord
-    Map<String, ZNRecord> externalViewMap = new HashMap<String, ZNRecord>();
-    for (ZNRecord externalView : externalViewList)
-    {
-      externalViewMap.put(externalView.getId(), externalView);
-    }
-    // Map from instance name to a map from resource to current state ZNRecord
-    Map<String, Map<String, ZNRecord>> currentStatesMap = new HashMap<String, Map<String, ZNRecord>>();
-    // Map from instance name to a list of combined flat ZNRecordRow
-    Map<String, List<ZNRecordRow>> flatCurrentStateMap = new HashMap<String, List<ZNRecordRow>>();
-
-    for (ZNRecord instance : liveInstances)
-    {
-      String host = instance.getId();
-      String sessionId = instance.getSimpleField(LiveInstanceProperty.SESSION_ID.toString());
-      Map<String, ZNRecord> currentStates = new HashMap<String, ZNRecord>();
-      List<ZNRecord> instanceCurrentStateList = new ArrayList<ZNRecord>();
-      for (ZNRecord idealState : idealStateList)
-      {
-        String resourceName = idealState.getId();
-        
-        HelixProperty property = accessor.getProperty(keyBuilder.currentState(host, sessionId, resourceName));
-        ZNRecord currentState =null;
-        if (property == null)
-        {
-          _logger.warn("Resource " + resourceName + " has null currentState");
-          currentState = new ZNRecord(resourceName);
-        }else{
-          currentState = property.getRecord();
-        }
-        currentStates.put(resourceName, currentState);
-        instanceCurrentStateList.add(currentState);
-      }
-      currentStatesMap.put(host, currentStates);
-      flatCurrentStateMap.put(host, ZNRecordRow.flatten(instanceCurrentStateList));
-    }
-    Query josqlQuery = new Query();
-
-    // Set the default bind variables
-    josqlQuery
-.setVariable(
-        PropertyType.CONFIGS.toString() + "/" + ConfigScopeProperty.PARTICIPANT.toString(),
-            instanceConfigs);
-    josqlQuery.setVariable(PropertyType.IDEALSTATES.toString(), idealStatesMap);
-    josqlQuery.setVariable(PropertyType.LIVEINSTANCES.toString(), liveInstances);
-    josqlQuery.setVariable(PropertyType.STATEMODELDEFS.toString(), stateModelDefs);
-    josqlQuery.setVariable(PropertyType.EXTERNALVIEW.toString(), externalViewMap);
-    josqlQuery.setVariable(PropertyType.CURRENTSTATES.toString(), currentStatesMap);
-    josqlQuery.setVariable(PARTITIONS, partitions);
-
-    // Flat version of ZNRecords
-    josqlQuery.setVariable(
-        PropertyType.CONFIGS.toString() + "/" + ConfigScopeProperty.PARTICIPANT.toString()
-            + FLATTABLE,
-        ZNRecordRow.flatten(instanceConfigs));
-    josqlQuery.setVariable(PropertyType.IDEALSTATES.toString() + FLATTABLE,
-        ZNRecordRow.flatten(idealStateList));
-    josqlQuery.setVariable(PropertyType.LIVEINSTANCES.toString() + FLATTABLE,
-        ZNRecordRow.flatten(liveInstances));
-    josqlQuery.setVariable(PropertyType.STATEMODELDEFS.toString() + FLATTABLE,
-        ZNRecordRow.flatten(stateModelDefs));
-    josqlQuery.setVariable(PropertyType.EXTERNALVIEW.toString() + FLATTABLE,
-        ZNRecordRow.flatten(externalViewList));
-    josqlQuery.setVariable(PropertyType.CURRENTSTATES.toString() + FLATTABLE,
-        flatCurrentStateMap.values());
-    josqlQuery.setVariable(PARTITIONS + FLATTABLE, ZNRecordRow.flatten(partitions));
-    // Set additional bind variables
-    if (bindVariables != null)
-    {
-      for (String key : bindVariables.keySet())
-      {
-        josqlQuery.setVariable(key, bindVariables.get(key));
-      }
-    }
-
-    josqlQuery.addFunctionHandler(new ZNRecordJosqlFunctionHandler());
-    josqlQuery.addFunctionHandler(new ZNRecordRow());
-    josqlQuery.addFunctionHandler(new Integer(0));
-    if (additionalFunctionHandlers != null)
-    {
-      for (Object functionHandler : additionalFunctionHandlers)
-      {
-        josqlQuery.addFunctionHandler(functionHandler);
-      }
-    }
-    return josqlQuery;
-  }
-
-  public List<Object> runJoSqlQuery(String josql, Map<String, Object> bindVariables,
-      List<Object> additionalFunctionHandlers) throws QueryParseException, QueryExecutionException
-  {
-    Query josqlQuery = prepareQuery(bindVariables, additionalFunctionHandlers);
-
-    // Per JoSql, select FROM <target> the target must be a object class that
-    // corresponds to a "table row",
-    // while the table (list of Objects) are put in the query by
-    // query.execute(List<Object>). In the input,
-    // In out case, the row is always a ZNRecord. But in SQL, the from target is
-    // a "table name".
-
-    String fromTargetString = parseFromTarget(josql);
-
-    List fromTargetList = null;
-    Object fromTarget = null;
-    if (fromTargetString.equalsIgnoreCase(PARTITIONS))
-    {
-      fromTarget = josqlQuery.getVariable(PARTITIONS.toString());
-    } else if (fromTargetString.equalsIgnoreCase(PropertyType.LIVEINSTANCES.toString()))
-    {
-      fromTarget = josqlQuery.getVariable(PropertyType.LIVEINSTANCES.toString());
-    } else if (fromTargetString.equalsIgnoreCase(PropertyType.CONFIGS.toString() + "/"
-        + ConfigScopeProperty.PARTICIPANT.toString()))
-    {
-      fromTarget = josqlQuery.getVariable(PropertyType.CONFIGS.toString() + "/"
-          + ConfigScopeProperty.PARTICIPANT.toString());
-    } else if (fromTargetString.equalsIgnoreCase(PropertyType.STATEMODELDEFS.toString()))
-    {
-      fromTarget = josqlQuery.getVariable(PropertyType.STATEMODELDEFS.toString());
-    } else if (fromTargetString.equalsIgnoreCase(PropertyType.EXTERNALVIEW.toString()))
-    {
-      fromTarget = josqlQuery.getVariable(PropertyType.EXTERNALVIEW.toString());
-    } 
-    else if (fromTargetString.equalsIgnoreCase(PropertyType.IDEALSTATES.toString()))
-    {
-      fromTarget = josqlQuery.getVariable(PropertyType.IDEALSTATES.toString());
-    }
-    
-    else if (fromTargetString.equalsIgnoreCase(PARTITIONS + FLATTABLE))
-    {
-      fromTarget = josqlQuery.getVariable(PARTITIONS.toString() + FLATTABLE);
-    } else if (fromTargetString.equalsIgnoreCase(PropertyType.LIVEINSTANCES.toString() + FLATTABLE))
-    {
-      fromTarget = josqlQuery.getVariable(PropertyType.LIVEINSTANCES.toString() + FLATTABLE);
-    } else if (fromTargetString.equalsIgnoreCase(PropertyType.CONFIGS.toString() + "/"
-        + ConfigScopeProperty.PARTICIPANT.toString()
-        + FLATTABLE))
-    {
-      fromTarget = josqlQuery.getVariable(PropertyType.CONFIGS.toString() + "/"
-          + ConfigScopeProperty.PARTICIPANT.toString() + FLATTABLE);
-    } else if (fromTargetString
-        .equalsIgnoreCase(PropertyType.STATEMODELDEFS.toString() + FLATTABLE))
-    {
-      fromTarget = josqlQuery.getVariable(PropertyType.STATEMODELDEFS.toString() + FLATTABLE);
-    } else if (fromTargetString.equalsIgnoreCase(PropertyType.EXTERNALVIEW.toString() + FLATTABLE))
-    {
-      fromTarget = josqlQuery.getVariable(PropertyType.EXTERNALVIEW.toString() + FLATTABLE);
-    }
-    else if (fromTargetString.equalsIgnoreCase(PropertyType.IDEALSTATES.toString() + FLATTABLE))
-    {
-      fromTarget = josqlQuery.getVariable(PropertyType.IDEALSTATES.toString() + FLATTABLE);
-    }
-    else
-    {
-      throw new HelixException(
-          "Unknown query target "
-              + fromTargetString
-              + ". Target should be PARTITIONS, LIVEINSTANCES, CONFIGS, STATEMODELDEFS, IDEALSTATES, EXTERNALVIEW, and corresponding flat Tables");
-    }
-
-    fromTargetList = fromTargetString.endsWith(FLATTABLE) ? ((List<ZNRecordRow>) fromTarget)
-        : ((List<ZNRecord>) fromTarget);
-
-    // Per JoSql, select FROM <target> the target must be a object class that
-    // corresponds to a "table row"
-    // In out case, the row is always a ZNRecord
-    josql = josql.replaceFirst(
-        fromTargetString,
-        fromTargetString.endsWith(FLATTABLE) ? ZNRecordRow.class.getName() : ZNRecord.class
-            .getName());
-    josqlQuery.parse(josql);
-    QueryResults qr = josqlQuery.execute(fromTargetList);
-    return qr.getResults();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/josql/DataAccessorBasedTupleReader.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/josql/DataAccessorBasedTupleReader.java b/helix-core/src/main/java/com/linkedin/helix/josql/DataAccessorBasedTupleReader.java
deleted file mode 100644
index 855b7f7..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/josql/DataAccessorBasedTupleReader.java
+++ /dev/null
@@ -1,171 +0,0 @@
-package com.linkedin.helix.josql;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.PropertyPathConfig;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-
-public class DataAccessorBasedTupleReader implements ZNRecordQueryProcessor.ZNRecordTupleReader
-{
-  Map<String , List<ZNRecord>> _cache = new HashMap<String, List<ZNRecord>>();
-  private final HelixDataAccessor _dataAccessor;
-  private final String _clusterName;
-
-  public DataAccessorBasedTupleReader(HelixDataAccessor dataAccessor, String clusterName)
-  {
-    _dataAccessor = dataAccessor;
-    _clusterName = clusterName;
-  }
-
-  @Override
-  public List<ZNRecord> get(String path) throws Exception
-  {
-    //first check cache
-    if(_cache.containsKey(path))
-    {
-      return _cache.get(path);
-    }
-    
-    String[] tmp = path.split("/");
-    if(tmp.length == 0 || tmp[0].equals("*"))
-    {
-      throw new Exception("Unable to read " + path);
-    }
-    
-    PropertyType parentProperty = null;
-    try
-    {
-      parentProperty = PropertyType.valueOf(tmp[0]);
-    }
-    catch(Exception e)
-    {
-      throw new Exception("Unable to translate " + tmp[0] + " into a valid property type", e);
-    }
-    
-    boolean pathHasWildcard = path.contains("*");
-    String childKeys[] = new String[tmp.length-1];
-    for(int i = 1; i < tmp.length; i++)
-    {
-      childKeys[i-1] = tmp[i];
-    }
-    
-    List<ZNRecord> ret = null;
-    String parentPath = PropertyPathConfig.getPath(parentProperty, _clusterName);
-
-    if(pathHasWildcard)
-    {
-      List<String> paths = expandWildCards(parentProperty, childKeys);
-      ret = new ArrayList<ZNRecord>();
-      for(String expandedPath : paths)
-      {
-        String fullPath = parentPath + "/" + expandedPath;
-        try
-        {
-          ZNRecord record = _dataAccessor.getBaseDataAccessor().get(fullPath, null, 0);
-          if(record != null)
-          {
-            ret.add(record);
-          }
-        } catch (ZkNoNodeException e)
-        {
-          // OK.
-        }
-      }
-    }
-    else
-    {
-      String propertyPath = PropertyPathConfig.getPath(parentProperty, _clusterName, childKeys);
-      ret = _dataAccessor.getBaseDataAccessor().getChildren(propertyPath, null, 0);
-      //ret = _dataAccessor.getChildValues(parentProperty, childKeys);
-      if(ret.size() == 0) //could be a leaf node - try accessing property directly
-      {
-        String fullPath = parentPath;
-        for(String childKey : childKeys)
-        {
-          fullPath += "/" + childKey;
-        }
-        ZNRecord record = _dataAccessor.getBaseDataAccessor().get(fullPath, null, 0);
-        if(record != null)
-        {
-          ret = Arrays.asList(record);
-        }
-      }
-    }
-    
-    _cache.put(path, ret);
-    return ret;
-  }
-  
-  private List<String> expandWildCards(PropertyType parentProperty, String[] pathElements)
-  {
-    if(pathElements.length == 0)
-    {
-      return Collections.emptyList();
-    }
-    
-    String path = "";
-    for (int i = 0; i < pathElements.length; i++)
-    {
-      path += pathElements[i];
-      if(i != pathElements.length -1)
-      {
-        path += "/";
-      }
-    }
-    if(!path.contains("*"))
-    {
-      return Arrays.asList(path);
-    }
-    else
-    {
-      List<String> prefix = new ArrayList<String>();
-  
-      for (int i = 0; i < pathElements.length; i++)
-      {
-        if(!pathElements[i].equals("*"))
-        {
-          prefix.add(pathElements[i]);
-        }
-        else
-        {
-          List<String> ret = new ArrayList<String>();
-          Set<String> childNames = new HashSet<String>();
-          //List<String> childNamesList = _dataAccessor.getChildNames(parentProperty, prefix.toArray(new String[0]));
-          String parentPath = PropertyPathConfig.getPath(parentProperty, _clusterName,prefix.toArray(new String[0]));
-          List<String> childNamesList = _dataAccessor.getBaseDataAccessor().getChildNames(parentPath, 0);
-          childNames.addAll(childNamesList);
-          for(String child : childNames)
-          {
-            pathElements[i] = child;
-            ret.addAll(expandWildCards(parentProperty, pathElements));
-          }
-          return ret;
-        }
-      }
-    }
-    
-    return Collections.emptyList();
-  }
-
-  public void setTempTable(String path, List<ZNRecord> table)
-  {
-    _cache.put(path, table);
-  }
-
-  @Override
-  public void reset()
-  {
-    _cache.clear();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/josql/JsqlQueryListProcessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/josql/JsqlQueryListProcessor.java b/helix-core/src/main/java/com/linkedin/helix/josql/JsqlQueryListProcessor.java
deleted file mode 100644
index 766ef64..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/josql/JsqlQueryListProcessor.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package com.linkedin.helix.josql;
-
-import java.util.List;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.ZNRecord;
-
-/**
- * Execute a list of combined queries. A combined query has the form {query}-->{resultName},
- * while the next query will have the previous result via the previous resultName.
- * 
- * */
-public class JsqlQueryListProcessor
-{
-  public static final String SEPARATOR = "-->";
-  private static Logger _logger = Logger.getLogger(JsqlQueryListProcessor.class);
-  
-  public static List<ZNRecord> executeQueryList(HelixDataAccessor accessor, String clusterName, List<String> combinedQueryList) throws Exception
-  {
-    ZNRecordQueryProcessor processor = new ZNRecordQueryProcessor();
-    DataAccessorBasedTupleReader tupleReader = new DataAccessorBasedTupleReader(accessor, clusterName);
-    List<ZNRecord> tempResult = null;
-    for(int i = 0; i < combinedQueryList.size(); i++)
-    {
-      String combinedQuery = combinedQueryList.get(i);
-      String query = combinedQuery;
-      String resultName =  "";
-      int pos = combinedQuery.indexOf(SEPARATOR);
-      if(pos < 0)
-      {
-        if(i <combinedQueryList.size() - 1)
-        {
-          _logger.error("Combined query " + combinedQuery + " " + i +" doe not contain " + SEPARATOR);
-        }
-      }
-      else
-      {
-        query = combinedQuery.substring(0, pos);
-        resultName =  combinedQuery.substring(pos + SEPARATOR.length()).trim(); 
-        if(resultName.length() == 0)
-        {
-          _logger.error("Combined query " + combinedQuery + " " + i + " doe not contain resultName");
-        }
-      }
-      tempResult = processor.execute(query, tupleReader);
-      if(resultName.length() > 0)
-      {
-        tupleReader.setTempTable(resultName, tempResult);
-      }
-    }
-    
-    return tempResult;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/josql/ZNRecordJosqlFunctionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/josql/ZNRecordJosqlFunctionHandler.java b/helix-core/src/main/java/com/linkedin/helix/josql/ZNRecordJosqlFunctionHandler.java
deleted file mode 100644
index 11c7412..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/josql/ZNRecordJosqlFunctionHandler.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.josql;
-
-import java.util.Map;
-
-import org.josql.functions.AbstractFunctionHandler;
-
-import com.linkedin.helix.ZNRecord;
-
-
-public class ZNRecordJosqlFunctionHandler extends AbstractFunctionHandler
-{
-  public boolean hasSimpleField(ZNRecord record, String fieldName, String field)
-  {
-    if(!record.getSimpleFields().containsKey(fieldName))
-    {
-      return false;
-    }
-    return field.equals(record.getSimpleField(fieldName));
-  }
-  
-  public boolean hasListField(ZNRecord record, String fieldName, String field)
-  {
-    if(!record.getListFields().containsKey(fieldName))
-    {
-      return false;
-    }
-    return record.getListField(fieldName).contains(field);
-  }
-  
-  public boolean hasMapFieldValue(ZNRecord record, String fieldName, String mapKey, String mapValue)
-  {
-    if(!record.getMapFields().containsKey(fieldName))
-    {
-      return false;
-    }
-    if(record.getMapField(fieldName).containsKey(mapKey))
-    {
-      return record.getMapField(fieldName).get(mapKey).equals(mapValue);
-    }
-    return false;
-  }
-  
-  public boolean hasMapFieldKey(ZNRecord record, String fieldName, String mapKey)
-  {
-    if(!record.getMapFields().containsKey(fieldName))
-    {
-      return false;
-    }
-    return record.getMapField(fieldName).containsKey(mapKey);
-  }
-  
-  public String getMapFieldValue(ZNRecord record, String fieldName, String mapKey)
-  {
-    if(record.getMapFields().containsKey(fieldName))
-    {
-      return record.getMapField(fieldName).get(mapKey);
-    }
-    return null;
-  }
-  
-  public String getSimpleFieldValue(ZNRecord record, String key)
-  {
-    return record.getSimpleField(key);
-  }
-  
-  public ZNRecord getZNRecordFromMap(Map<String, ZNRecord> recordMap, String key)
-  {
-    return recordMap.get(key);
-  }
-  
-  public ZNRecord getZNRecordFromMap(Map<String, Map<String, ZNRecord>> recordMap, String key, String subKey)
-  {
-    return recordMap.get(key).get(subKey);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/josql/ZNRecordQueryProcessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/josql/ZNRecordQueryProcessor.java b/helix-core/src/main/java/com/linkedin/helix/josql/ZNRecordQueryProcessor.java
deleted file mode 100644
index 8f3b91f..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/josql/ZNRecordQueryProcessor.java
+++ /dev/null
@@ -1,1401 +0,0 @@
-package com.linkedin.helix.josql;
-
-import java.io.StringReader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-
-import com.linkedin.helix.ZNRecord;
-
-import net.sf.jsqlparser.expression.*;
-import net.sf.jsqlparser.expression.StringValue;
-import net.sf.jsqlparser.expression.operators.arithmetic.*;
-import net.sf.jsqlparser.expression.operators.conditional.*;
-import net.sf.jsqlparser.expression.operators.relational.*;
-import net.sf.jsqlparser.parser.CCJSqlParserManager;
-import net.sf.jsqlparser.schema.Column;
-import net.sf.jsqlparser.schema.Table;
-import net.sf.jsqlparser.statement.*;
-import net.sf.jsqlparser.statement.select.*;
-
-class OrderByVisitorImpl implements OrderByVisitor
-{
-  boolean isAsc = true;
-  Expression expression = null;
-  
-  @Override
-  public void visit(OrderByElement orderBy)
-  {
-    isAsc = orderBy.isAsc();
-    expression = orderBy.getExpression();
-  }
-}
-
-class FromItemVisitorImpl implements FromItemVisitor
-{
-  public Map<Integer, Table> tables = new HashMap<Integer, Table>();
-  public Map<Integer, SelectVisitorImpl> subSelects = new HashMap<Integer, SelectVisitorImpl>();
-  public List<Join> joins = new ArrayList<Join>();
-  int fromItemIndex = 0;
-  
-  @Override
-  public void visit(Table table)
-  {
-    tables.put(fromItemIndex++, table);
-  }
-
-  @Override
-  public void visit(SubSelect subSelect)
-  {
-    SelectVisitorImpl sub = new SelectVisitorImpl();
-    subSelect.getSelectBody().accept(sub);
-    sub.alias = subSelect.getAlias();
-    subSelects.put(fromItemIndex++, sub);
-  }
-
-  @Override
-  public void visit(SubJoin subjoin)
-  {
-    throw new UnsupportedOperationException();
-  }
-}
-
-class FunctionImpl
-{
-  String name;
-  List<ExpressionVisitorImpl> expressionVisitors = new ArrayList<ExpressionVisitorImpl>();
-}
-
-class ExpressionVisitorImpl implements ExpressionVisitor
-{
-  Column column = null;
-  FunctionImpl function = null;
-  Object atomVal = null;
-  EqualsTo equalsTo = null;
-  AndExpression andExpression = null;
-  OrExpression orExpression = null;
-  MinorThan minorThan = null;
-  GreaterThan greaterThan = null;
-  Subtraction subtraction = null;
-  Addition addition = null;
-  Parenthesis parenthesis = null;
-  
-  Expression expression;
-  
-  ExpressionVisitorImpl(Expression expression)
-  {
-    this.expression = expression;
-  }
-  
-  @Override
-  public void visit(NullValue nullValue)
-  {
-    throw new UnsupportedOperationException(); 
-  }
-
-  @Override
-  public void visit(Function f)
-  {
-    function = new FunctionImpl();
-    function.name = f.getName();
-    ExpressionList parameters = f.getParameters();
-    if(parameters != null)
-    {
-      @SuppressWarnings("unchecked")
-      List<Expression> expressions = parameters.getExpressions();
-      for(Expression e : expressions)
-      {
-        ExpressionVisitorImpl v = new ExpressionVisitorImpl(e);
-        e.accept(v);
-        function.expressionVisitors.add(v);
-      }
-    }
-  }
-
-  @Override
-  public void visit(InverseExpression inverseExpression)
-  {
-    throw new UnsupportedOperationException(); 
-  }
-
-  @Override
-  public void visit(JdbcParameter jdbcParameter)
-  {
-    throw new UnsupportedOperationException(); 
-  }
-
-  @Override
-  public void visit(DoubleValue doubleValue)
-  {
-    atomVal = Double.valueOf(doubleValue.getValue());
-  }
-
-  @Override
-  public void visit(LongValue longValue)
-  {
-    atomVal = Long.valueOf(longValue.getValue());
-  }
-
-  @Override
-  public void visit(DateValue dateValue)
-  {
-    throw new UnsupportedOperationException(); 
-  }
-
-  @Override
-  public void visit(TimeValue timeValue)
-  {
-    throw new UnsupportedOperationException(); 
-  }
-
-  @Override
-  public void visit(TimestampValue timestampValue)
-  {
-    throw new UnsupportedOperationException(); 
-  }
-
-  @Override
-  public void visit(Parenthesis parenthesis)
-  {
-    this.parenthesis = parenthesis;
-  }
-
-  @Override
-  public void visit(StringValue stringValue)
-  {
-    atomVal = stringValue.getValue();
-  }
-
-  @Override
-  public void visit(Addition addition)
-  {
-    this.addition = addition;
-  }
-
-  @Override
-  public void visit(Division division)
-  {
-    throw new UnsupportedOperationException(); 
-  }
-
-  @Override
-  public void visit(Multiplication multiplication)
-  {
-    throw new UnsupportedOperationException(); 
-  }
-
-  @Override
-  public void visit(Subtraction subtraction)
-  {
-   this.subtraction = subtraction;
-  }
-
-  @Override
-  public void visit(AndExpression andExpression)
-  {
-    this.andExpression = andExpression;
-  }
-
-  @Override
-  public void visit(OrExpression orExpression)
-  {
-    this.orExpression = orExpression;
-  }
-
-  @Override
-  public void visit(Between between)
-  {
-    throw new UnsupportedOperationException(); 
-  }
-
-  @Override
-  public void visit(EqualsTo equalsTo)
-  {
-    this.equalsTo = equalsTo;
-  }
-
-  @Override
-  public void visit(GreaterThan greaterThan)
-  {
-    this.greaterThan = greaterThan;
-  }
-
-  @Override
-  public void visit(GreaterThanEquals greaterThanEquals)
-  {
-    throw new UnsupportedOperationException(); 
-  }
-
-  @Override
-  public void visit(InExpression inExpression)
-  {
-    throw new UnsupportedOperationException(); 
-  }
-
-  @Override
-  public void visit(IsNullExpression isNullExpression)
-  {
-    throw new UnsupportedOperationException(); 
-  }
-
-  @Override
-  public void visit(LikeExpression likeExpression)
-  {
-    throw new UnsupportedOperationException(); 
-  }
-
-  @Override
-  public void visit(MinorThan minorThan)
-  {
-    this.minorThan = minorThan;
-  }
-
-  @Override
-  public void visit(MinorThanEquals minorThanEquals)
-  {
-    throw new UnsupportedOperationException(); 
-  }
-
-  @Override
-  public void visit(NotEqualsTo notEqualsTo)
-  {
-    throw new UnsupportedOperationException(); 
-  }
-
-  @Override
-  public void visit(Column tableColumn)
-  {
-    column = tableColumn;
-  }
-
-  @Override
-  public void visit(SubSelect subSelect)
-  {
-    throw new UnsupportedOperationException(); 
-  }
-
-  @Override
-  public void visit(CaseExpression caseExpression)
-  {
-    throw new UnsupportedOperationException(); 
-  }
-
-  @Override
-  public void visit(WhenClause whenClause)
-  {
-    throw new UnsupportedOperationException(); 
-  }
-
-  @Override
-  public void visit(ExistsExpression existsExpression)
-  {
-    throw new UnsupportedOperationException(); 
-  }
-
-  @Override
-  public void visit(AllComparisonExpression allComparisonExpression)
-  {
-    throw new UnsupportedOperationException(); 
-  }
-
-  @Override
-  public void visit(AnyComparisonExpression anyComparisonExpression)
-  {
-    throw new UnsupportedOperationException(); 
-  }
-
-  @Override
-  public void visit(Concat concat)
-  {
-    throw new UnsupportedOperationException(); 
-  }
-
-  @Override
-  public void visit(Matches matches)
-  {
-    throw new UnsupportedOperationException(); 
-  }
-
-  @Override
-  public void visit(BitwiseAnd bitwiseAnd)
-  {
-    throw new UnsupportedOperationException(); 
-  }
-
-  @Override
-  public void visit(BitwiseOr bitwiseOr)
-  {
-    throw new UnsupportedOperationException(); 
-  }
-
-  @Override
-  public void visit(BitwiseXor bitwiseXor)
-  {
-    throw new UnsupportedOperationException(); 
-  }
-}
-
-class SelectItemVisitorImpl implements SelectItemVisitor
-{
-  String alias;
-  boolean shouldSelectAllCols = false;
-  Expression expression = null;
-  
-  @Override
-  public void visit(AllColumns allColumns)
-  {
-    shouldSelectAllCols = true;
-  }
-
-  @Override
-  public void visit(AllTableColumns allTableColumns)
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void visit(SelectExpressionItem selectExpressionItem)
-  {
-    alias = selectExpressionItem.getAlias();
-    expression = selectExpressionItem.getExpression();
-  }
-}
-
-class SelectVisitorImpl implements SelectVisitor
-{
-  String alias = null;
-  public List<SelectItemVisitorImpl> selectItemVisitors = new ArrayList<SelectItemVisitorImpl>();
-  public FromItemVisitorImpl fromItemVisitor = new FromItemVisitorImpl();
-  public List<OrderByVisitorImpl> orderByVisitors = null;
-  public ExpressionVisitorImpl whereVisitor = null;
-  public List<Column> groupByColumnReferences = null;
-  public Union union = null;
-  public Limit limit = null;
-  
-  @SuppressWarnings("unchecked")
-  @Override
-  public void visit(PlainSelect plainSelect)
-  {
-    List<SelectItem> selectItems = plainSelect.getSelectItems();
-    for(SelectItem selectItem : selectItems)
-    {
-      SelectItemVisitorImpl selectItemVisitor = new SelectItemVisitorImpl();
-      selectItem.accept(selectItemVisitor);
-      selectItemVisitors.add(selectItemVisitor);
-    }
-    
-    plainSelect.getFromItem().accept(fromItemVisitor);
-    if (plainSelect.getJoins() != null)
-    {
-      for (Iterator<Join> joinsIt = plainSelect.getJoins().iterator(); joinsIt.hasNext();)
-      {
-        Join join = joinsIt.next();
-        if(join.getUsingColumns() == null || join.getUsingColumns().size() == 0)
-        {
-          throw new UnsupportedOperationException("'using' directive required for join");
-        }
-        if(join.getUsingColumns().size() != 2)
-        {
-          throw new UnsupportedOperationException("'using' directive for join requires exactly two columns");
-        }
-        join.getRightItem().accept(fromItemVisitor);
-        fromItemVisitor.joins.add(join);
-      }
-    }
-
-    Expression where = plainSelect.getWhere();
-    if(where != null)
-    {
-      whereVisitor = new ExpressionVisitorImpl(where);
-      where.accept(whereVisitor);
-    }
-    
-    groupByColumnReferences = plainSelect.getGroupByColumnReferences();
-
-    List<OrderByElement> orderByElements = plainSelect.getOrderByElements();
-    if(orderByElements != null)
-    {
-      orderByVisitors = new ArrayList<OrderByVisitorImpl>();
-      for(OrderByElement e : orderByElements)
-      {
-        OrderByVisitorImpl impl = new OrderByVisitorImpl();
-        e.accept(impl);
-        orderByVisitors.add(impl);
-      }
-    }
-    
-    limit = plainSelect.getLimit();
-  }
-
-  @Override
-  public void visit(Union union)
-  {
-    this.union = union;
-  }
-}
-
-public class ZNRecordQueryProcessor
-{
-  public interface ZNRecordTupleReader
-  {
-    List<ZNRecord> get(String path) throws Exception;
-    void reset();
-  }
-
-  private String replaceExplodeFields(String sql, String functionName)
-  {
-    String ret = sql;
-    ret.replaceAll("`", ""); 
-    
-    Pattern pattern = Pattern.compile(functionName + "\\((.*?)\\)");
-    Matcher m = pattern.matcher(ret);
-    while (m.find())
-    {
-      String args[] = m.group(1).split(",");
-      if(args.length  == 0) 
-      { 
-        throw new IllegalArgumentException(functionName + " requires at least 2 parameters"); 
-      }
-      
-      String str = functionName + ".";
-      String s = "";
-      for (int i = 0; i < args.length; i++)
-      {
-        s += args[i].trim().replaceAll("`", "");
-        if(i != args.length-1)
-        {
-          s += "_____";
-        }
-      }
-      str += "`" + s + "`";
-
-      ret = ret.replaceFirst(functionName + "\\(.*?\\)", str);
-    }
-    return ret;
-  }
-  
-  public List<ZNRecord> execute(String sql, ZNRecordTupleReader tupleReader) throws Exception
-  {
-    sql = replaceExplodeFields(sql, "explodeList");
-    sql = replaceExplodeFields(sql, "explodeMap");
-    
-    CCJSqlParserManager pm = new CCJSqlParserManager();
-    Statement statement = pm.parse(new StringReader(sql));
-    if (statement instanceof Select)
-    {
-      Select selectStatement = (Select) statement;
-      SelectVisitorImpl v = new SelectVisitorImpl();
-      selectStatement.getSelectBody().accept(v);
-      return executeSelect(v, tupleReader);
-    }
-    
-    throw new UnsupportedOperationException(statement.toString());
-  }
-  
-  private void applyLimit(Limit limit, List<ZNRecord> input)
-  {
-    if(limit != null)
-    {
-      int rowCount = (int) limit.getRowCount();
-      if(input != null && rowCount < input.size())
-      {
-        input.subList(rowCount, input.size()).clear();
-      }
-    }
-  }
-  
-  private Integer compareRecords(ZNRecord o1, ZNRecord o2, ExpressionVisitorImpl expressionVisitor)
-  {
-    Object val1 = evalExpression(expressionVisitor, o1);
-    Object val2 = evalExpression(expressionVisitor, o2);
-    
-    if(val1 == null || val2 == null) 
-    {
-      return null;
-    }
-    
-    if(val1 instanceof Long && val2 instanceof Long)
-    {
-      return ((Long) val1).compareTo((Long)val2);
-    }
-    else if(val1 instanceof Double && val2 instanceof Double)
-    {
-      return ((Double)val1).compareTo((Double)val2);
-    }
-    else if(val1 instanceof String && val2 instanceof String)
-    {
-      return ((String)val1).compareTo((String) val2);
-    }
-    
-    return null;
-  }
-  
-  private void applyOrderBy(final List<OrderByVisitorImpl> orderByVisitors, final List<ZNRecord> result)
-  {
-    class RecordComparator implements Comparator<ZNRecord>
-    {
-      @Override
-      public int compare(ZNRecord o1, ZNRecord o2)
-      {
-        for(OrderByVisitorImpl visitor : orderByVisitors)
-        {
-          ExpressionVisitorImpl expressionVisitor = new ExpressionVisitorImpl(visitor.expression);
-          visitor.expression.accept(expressionVisitor);
-          Integer compareResult = compareRecords(o1, o2, expressionVisitor);
-
-          if(compareResult == null) 
-          {
-            return -1; //not comparable. return arbitrary "less" value
-          }
-          
-          if(compareResult != 0)
-          {
-            return (visitor.isAsc ? compareResult : -compareResult);
-          }
-        }
-        
-        return 0;
-      }
-    }
-    
-    if(orderByVisitors != null)
-    {
-      Collections.sort(result, new RecordComparator());
-    }
-  }
-  
-  private List<ZNRecord> executePlainSelect(SelectVisitorImpl v, ZNRecordTupleReader tupleReader) throws Exception
-  {
-    SourceTableList sourceTables = getSourceTables(v, tupleReader);
-
-    SourceTable joinResult = getJoin(v.fromItemVisitor.joins, sourceTables, 0, sourceTables.size()-1);
-    List<ZNRecord> result = joinResult.tuples;
-    
-    //XXX: hack needed for now to allow aliased/computed columns to be used in where and group by
-    //figure out a better way
-    List<ZNRecord> tempProjection = getProjection(v, result);
-    if(tempProjection != null)
-    {
-      for(int i = 0; i < result.size(); i++)
-      {
-        ZNRecord record = result.get(i);
-        if(record instanceof ZNRecordSet)
-        {
-          ((ZNRecordSet)record).addRecord("xxx_random_"+System.currentTimeMillis(), tempProjection.get(i));
-        }
-        else
-        {
-          record.merge(tempProjection.get(i));
-        }
-      }
-    }
-    
-    if(v.whereVisitor != null)
-    {
-      result = applyWhereClause(v, result);
-    }
-
-    if(v.groupByColumnReferences != null)
-    {
-      result = applyGroupBy(v, result);
-    }
-    else
-    {
-      ZNRecord aggregate = getAggregateColumns(v, result);
-      if(aggregate != null)
-      {
-        result = Arrays.asList(aggregate);
-      }
-      else
-      {
-        List<ZNRecord> projection = getProjection(v, result);
-        if(projection != null)
-        {
-          result = projection;
-        }
-        
-        applyOrderBy(v.orderByVisitors, result);
-        applyLimit(v.limit, result);
-      }
-    }
-    
-    return result;
-  }
-  
-  private SourceTable getJoin(List<Join> joins,  SourceTableList sourceTables, int fromIndex, int toIndex)
-  {
-    assert(joins.size() == sourceTables.size()-1);
-    if(fromIndex == toIndex)
-    {
-      return sourceTables.get(fromIndex);
-    }
-    else
-    {
-      SourceTable leftTable = sourceTables.get(fromIndex);
-      List<Column> usingColumns = joins.get(fromIndex).getUsingColumns();
-      Column leftCol = null;
-      Column rightCol = null;
-      if(usingColumns.get(0).getTable().getName().equals(leftTable.name))
-      {
-        leftCol = usingColumns.get(0);
-        rightCol = usingColumns.get(1);
-      }
-      else
-      {
-        rightCol = usingColumns.get(0);
-        leftCol = usingColumns.get(1);
-      }
-      SourceTable rightTable = getJoin(joins, sourceTables, fromIndex+1, toIndex);
-      return getJoin(leftCol, rightCol, leftTable, rightTable);
-    }
-  }
-
-  private SourceTable getJoin(Column leftCol,
-                              Column rightCol,
-                              SourceTable leftTable, 
-                              SourceTable rightTable)
-  {
-    if(leftCol.getTable() == null || leftCol.getTable().equals(""))
-    {
-      throw new IllegalArgumentException(leftCol.toString() + " used in join should be fully qualified");
-    }
-    if(rightCol.getTable() == null || rightCol.getTable().equals(""))
-    {
-      throw new IllegalArgumentException(rightCol.toString() + " used in join should be fully qualified");
-    }
-
-
-    Map<String, Set<ZNRecord>> rightMap = new HashMap<String, Set<ZNRecord>>();
-    for(ZNRecord record : rightTable.tuples)
-    {
-      String rightVal = getSimpleColumn(record, rightCol);
-      if(rightVal != null)
-      {
-        if(!rightMap.containsKey(rightVal))
-        {
-          rightMap.put(rightVal, new HashSet<ZNRecord>());
-        }
-        rightMap.get(rightVal).add(record);
-      }
-    }
-    
-    List<ZNRecord> tuples = new ArrayList<ZNRecord>();
-    for(ZNRecord leftRecord : leftTable.tuples)
-    {
-      String leftVal = getSimpleColumn(leftRecord, leftCol);
-      if(leftVal != null && rightMap.containsKey(leftVal))
-      {
-        Set<ZNRecord> rightRecords = rightMap.get(leftVal);
-        for(ZNRecord rightRecord : rightRecords)
-        {
-          if(rightRecord instanceof ZNRecordSet)
-          {
-            ((ZNRecordSet)rightRecord).addRecord(leftTable.name, leftRecord);
-            tuples.add(rightRecord);
-          }
-          else
-          {
-            ZNRecordSet recordSet = new ZNRecordSet("");
-            recordSet.addRecord(rightTable.name, rightRecord);
-            recordSet.addRecord(leftTable.name, leftRecord);
-            tuples.add(recordSet);
-          }
-        }
-      }
-    }
-
-    return new SourceTable("xxx_random_" + System.currentTimeMillis(), 0, tuples);
-  }
-
-  private Integer evalComparisonExpression(BinaryExpression expression, ZNRecord record)
-  {
-    Object leftVal = evalExpression(expression.getLeftExpression(), record);
-    Object rightVal = evalExpression(expression.getRightExpression(), record);
-    if (leftVal != null && rightVal != null)
-    {
-      if (leftVal instanceof String && rightVal instanceof String)
-      {
-        return ((String) leftVal).compareTo((String) rightVal);
-      }
-      else if (leftVal instanceof Long || rightVal instanceof Long || 
-          leftVal instanceof Double || rightVal instanceof Double)
-      {
-        Object l = toNum(leftVal);
-        Object r = toNum(rightVal);
-        if(l instanceof Long && r instanceof Long)
-        {
-          return ((Long) l).compareTo((Long) r);
-        }
-        else if (l instanceof Double && l instanceof Double)
-        {
-          return ((Double) l).compareTo((Double) r);
-        }
-      }
-      else
-      {
-        throw new IllegalArgumentException("Left and right parameters of comparison cannot be compared");
-      }
-    }
-
-    return null;
-  }
-  
-  private Object evalExpression(ExpressionVisitorImpl visitor, ZNRecord record)
-  {
-    if(visitor.column != null)
-    {
-      return getSimpleColumn(record, visitor.column);
-    }
-    else if(visitor.function != null)
-    {
-      return evalFunction(record, visitor.function);
-    }
-    else if(visitor.atomVal != null)
-    {
-      return visitor.atomVal;
-    }
-    else if(visitor.equalsTo != null)
-    {
-      Integer ret = evalComparisonExpression(visitor.equalsTo, record);
-      return Boolean.valueOf(ret != null && ret == 0);
-    }
-    else if(visitor.minorThan != null)
-    {
-      Integer ret = evalComparisonExpression(visitor.minorThan, record);
-      return Boolean.valueOf(ret != null && ret < 0);
-    }
-    else if(visitor.greaterThan != null)
-    {
-      Integer ret = evalComparisonExpression(visitor.greaterThan, record);
-      return Boolean.valueOf(ret != null && ret > 0);
-    }
-    else if(visitor.subtraction != null || visitor.addition != null)
-    {
-      Expression leftExpression = (visitor.subtraction != null ? visitor.subtraction.getLeftExpression() : visitor.addition.getLeftExpression());
-      Expression rightExpression = (visitor.subtraction != null ? visitor.subtraction.getRightExpression() : visitor.addition.getRightExpression());
-      Object leftVal = toNum(evalExpression(leftExpression, record));
-      Object rightVal = toNum(evalExpression(rightExpression, record));
-      if(leftVal != null && rightVal != null)
-      {
-        int multiplier = (visitor.subtraction != null ? -1 : 1);
-        if (leftVal instanceof Long && rightVal instanceof Long)
-        {
-          return Long.valueOf((Long) leftVal + (multiplier * (Long) rightVal));
-        }
-        else if (leftVal instanceof Double && rightVal instanceof Double)
-        {
-          return Double.valueOf((Double) leftVal + (multiplier * (Double) rightVal));
-        }
-      }
-    }
-    else if(visitor.andExpression != null)
-    {
-      Object leftVal = evalExpression(visitor.andExpression.getLeftExpression(), record);
-      Object rightVal = evalExpression(visitor.andExpression.getRightExpression(), record);
-      if(leftVal != null && rightVal != null)
-      {
-        if(!(leftVal instanceof Boolean) || !(rightVal instanceof Boolean))
-        {
-          throw new IllegalArgumentException("AND clause parameters don't evaluate to boolean");
-        }
-        return Boolean.valueOf(((Boolean)leftVal) && ((Boolean)rightVal));
-      }
-      else
-      {
-        return Boolean.FALSE;
-      }
-    }
-    else if(visitor.parenthesis != null)
-    {
-      return evalExpression(visitor.parenthesis.getExpression(), record);
-    }
-    
-    return null;
-  }
-  
-  private Object evalExpression(Expression exp, ZNRecord record)
-  {
-    //TODO: Replace logic here with generic expression evaluation
-    ExpressionVisitorImpl visitor = new ExpressionVisitorImpl(exp);
-    exp.accept(visitor);
-    return evalExpression(visitor, record);
-  }
-  
-  private List<ZNRecord> applyWhereClause(SelectVisitorImpl v,
-                                          List<ZNRecord> input)
-  {
-    List<ZNRecord> result = new ArrayList<ZNRecord>();
-    for (ZNRecord record : input)
-    {
-      Object evalResult = evalExpression(v.whereVisitor, record);
-      if(evalResult == null || !(evalResult instanceof Boolean))
-      {
-        throw new UnsupportedOperationException("Unsupported predicate in where clause:" + v.whereVisitor.expression);
-      }
-      else if((Boolean)evalResult)
-      {
-        result.add(record);
-      }
-    }
-
-    return result;
-  }
-
-  private List<ZNRecord> executeUnion(SelectVisitorImpl v, ZNRecordTupleReader tupleReader) throws Exception
-  {
-    @SuppressWarnings("unchecked")
-    List<PlainSelect> plainSelects = v.union.getPlainSelects();
-    List<ZNRecord> result = new ArrayList<ZNRecord>();
-    for(PlainSelect select : plainSelects)
-    {
-      SelectVisitorImpl visitor = new SelectVisitorImpl();
-      select.accept(visitor);
-      result.addAll(executePlainSelect(visitor, tupleReader));
-    }
-    
-    applyLimit(v.limit, result);
-    return result;
-  }
-  
-  private List<ZNRecord> executeSelect(SelectVisitorImpl v, ZNRecordTupleReader tupleReader) throws Exception
-  {
-    if(v.union != null)
-    {
-      return executeUnion(v, tupleReader);
-    }
-    else
-    {
-      return executePlainSelect(v, tupleReader);
-    }
-  }
-
-  private String getSimpleColumn(ZNRecord record, Column column)
-  {
-    if(record == null)
-    {
-      return null;
-    }
-    
-    String ret = record.getSimpleField(column.getWholeColumnName());
-    if(ret == null)
-    {
-      ret = record.getSimpleField(column.getColumnName());
-    }
-    if(ret == null)
-    {
-      Map<String, String> simpleFields = record.getSimpleFields();
-      for(String key : simpleFields.keySet())
-      {
-        if(key.endsWith("." + column.getColumnName()))
-        {
-          ret = simpleFields.get(key);
-        }
-      }
-    }
-    return ret;
-  }
-  
-  private Map<Map<Column, String>, List<ZNRecord>> formGroups(List<Column> groupByCols, List<ZNRecord> input)
-  {    
-    Map<Map<Column, String>, List<ZNRecord>> map = new HashMap<Map<Column,String>, List<ZNRecord>>();
-    for(ZNRecord record : input)
-    {
-      Map<Column, String> group = new HashMap<Column, String>();
-      for(Column col : groupByCols)
-      {
-        String val = getSimpleColumn(record, col);
-        if(val != null)
-        {
-          group.put(col, val);
-        }
-      }
-      if(group.size() != 0)
-      {
-        if(!map.containsKey(group))
-        {
-          map.put(group, new ArrayList<ZNRecord>());
-        }
-        map.get(group).add(record);
-      }
-    }
-    return map;
-  }
-  
-  //TODO: Validate that only group by keys and aggregate functions are in select items
-  private List<ZNRecord> applyGroupBy(SelectVisitorImpl v, List<ZNRecord> input) throws Exception
-  {
-    List<ZNRecord> result = new ArrayList<ZNRecord>();
-    Map<Map<Column, String>, List<ZNRecord>> groups = formGroups(v.groupByColumnReferences, input);
-    for(Map<Column, String> group : groups.keySet())
-    {
-      List<ZNRecord> tuples = groups.get(group);
-      ZNRecord aggregate = getAggregateColumns(v, tuples);
-      if(aggregate != null)
-      {
-        for(Column col : group.keySet())
-        {
-          String val = group.get(col);
-          aggregate.setSimpleField(col.getWholeColumnName(), val);
-        }
-        result.add(aggregate);
-      }
-    }
-    
-    return result;
-  }
-  
-  //TODO: Generalize this
-  private ZNRecord getAggregateColumns(SelectVisitorImpl v, List<ZNRecord> input) throws Exception
-  {
-    for(SelectItemVisitorImpl impl : v.selectItemVisitors)
-    {
-      if(impl.expression != null)
-      {
-        ExpressionVisitorImpl expressionVisitor = new ExpressionVisitorImpl(impl.expression); 
-        impl.expression.accept(expressionVisitor);
-        if(expressionVisitor.function != null && (expressionVisitor.function.name.equalsIgnoreCase("max") 
-            || expressionVisitor.function.name.equalsIgnoreCase("min")))
-        {
-          if (expressionVisitor.function.expressionVisitors.size() != 1)
-          {
-            throw new IllegalArgumentException(expressionVisitor.function.name + " needs one argument");
-          }
-          
-          ZNRecord min = null;
-          ZNRecord max = null;
-          for(ZNRecord record : input)
-          {
-            if(min == null || compareRecords(record, min, expressionVisitor.function.expressionVisitors.get(0)) < 0)
-            {
-              min = record;
-            }
-            if(max == null || compareRecords(record, max, expressionVisitor.function.expressionVisitors.get(0)) > 0)
-            {
-              max = record;
-            }
-          }
-          
-          Object val = null;
-          if(expressionVisitor.function.name.equalsIgnoreCase("max"))
-          {
-           val = evalExpression(expressionVisitor.function.expressionVisitors.get(0), max); 
-          }
-          else
-          {
-            val = evalExpression(expressionVisitor.function.expressionVisitors.get(0), min);
-          }
-          ZNRecord ret = new ZNRecord("");
-          String columnName = (impl.alias != null ? impl.alias : expressionVisitor.function.name);
-          if(val != null)
-          {
-            ret.setSimpleField(columnName, ""+val);
-          }
-          return ret;
-        }
-      }
-    }
-    return null;
-  }
-
-  //return null if all columns are projected
-  private  List<ZNRecord> getProjection(SelectVisitorImpl v, List<ZNRecord> input) throws Exception
-  {
-    List<ZNRecord> output = new ArrayList<ZNRecord>();
-    
-    for (ZNRecord record : input)
-    {
-      ZNRecord projection = new ZNRecord("");
-      for(SelectItemVisitorImpl impl : v.selectItemVisitors)
-      {
-        if(impl.shouldSelectAllCols)
-        {
-          return null;
-        }
-        else if(impl.expression != null)
-        {
-          ExpressionVisitorImpl expressionVisitor = new ExpressionVisitorImpl(impl.expression); 
-          impl.expression.accept(expressionVisitor);
-          String columnName = impl.alias;
-          Object val = evalExpression(expressionVisitor, record);
-          if(expressionVisitor.column != null && columnName == null)
-          {
-            columnName = expressionVisitor.column.getWholeColumnName();
-          }
-          else if(expressionVisitor.function != null && columnName == null)
-          {
-            columnName = expressionVisitor.function.name;
-          }
-          else if(columnName == null)
-          {
-            columnName = impl.expression.toString();
-          }
-          if(val != null)
-          {
-            projection.setSimpleField(columnName, val.toString());
-          }
-        }
-      }
-
-      output.add(projection);
-    }
-    return output;
-  }
-
-  private Object getParamValue(FunctionImpl function, int index, ZNRecord record)
-  {
-    if(function.expressionVisitors.get(index).function != null)
-    {
-      return evalFunction(record, function.expressionVisitors.get(index).function);
-    }
-    else if(function.expressionVisitors.get(index).column != null)
-    {
-      return getSimpleColumn(record, function.expressionVisitors.get(index).column);
-    }
-    else
-    {
-      return function.expressionVisitors.get(index).atomVal;
-    }
-  }
-  
-  private Object evalFunction(ZNRecord record, FunctionImpl function)
-  {
-    if(function.name.equalsIgnoreCase("concat"))
-    {
-      if (function.expressionVisitors.size() != 2)
-      {
-        throw new IllegalArgumentException("concat() needs 2 arguments");
-      }
-      Object arg1 = getParamValue(function, 0, record);
-      Object arg2 = getParamValue(function, 1, record);
-      String ret = (arg1 != null ? arg1.toString() : "");
-      ret += (arg2 != null ? arg2.toString() : "");
-      return ret;
-    }
-    else if(function.name.equalsIgnoreCase("to_number"))
-    {
-      if (function.expressionVisitors.size() != 1)
-      {
-        throw new IllegalArgumentException("to_number() needs 1 argument");
-      }
-      Object param = getParamValue(function, 0, record);
-      return toNum(param);
-    }
-
-    return null;
-  }
-
-  private Object toNum(Object o)
-  {
-    if(o == null)
-    {
-      return null;
-    }
-    
-    if(o instanceof String)
-    {
-      String s = (String) o;
-      if(s.contains("."))
-      {
-        try {return Double.parseDouble(s);}catch(Exception e){}
-      }
-      else
-      {
-        try {return Long.parseLong(s);}catch(Exception e){}
-      }
-    }
-    else if(o instanceof Long || o instanceof Double)
-    {
-      return o;
-    }
-    
-    return null;
-  }
- 
-  private SourceTableList getSourceTables(SelectVisitorImpl v, ZNRecordTupleReader tupleReader) throws Exception
-  {
-    SourceTableList sourceTables = new SourceTableList();
-    
-    for(int fromItemIndex : v.fromItemVisitor.tables.keySet())
-    {
-      Table t = v.fromItemVisitor.tables.get(fromItemIndex);
-      String schema = t.getSchemaName();
-      if(schema != null)
-      {
-        schema = schema.replaceAll("`", "");
-      }
-      String tableName = t.getName();
-      tableName = tableName.replaceAll("`", "");
-      List<ZNRecord> tuples = null;
-      String key = null;
-      if(schema != null && (schema.equalsIgnoreCase("explodeList") || schema.equalsIgnoreCase("explodeMap")))
-      {
-        tuples = getExplodedTable(schema, tableName, tupleReader);
-        key = (t.getAlias() != null ? t.getAlias() : "xxx_random_" + System.currentTimeMillis());
-      }
-      else
-      {
-        tuples = tupleReader.get(tableName);
-        key = (t.getAlias() != null ? t.getAlias() : tableName);
-      }
-      sourceTables.add(new SourceTable(key, fromItemIndex, tuples));
-    }
-    
-    for(int fromItemIndex : v.fromItemVisitor.subSelects.keySet())
-    {
-      SelectVisitorImpl subSelect = v.fromItemVisitor.subSelects.get(fromItemIndex);
-      List<ZNRecord> tuples = executeSelect(subSelect, tupleReader);
-      String key = (subSelect.alias != null ? subSelect.alias : "xxx_random_" + System.currentTimeMillis());
-      sourceTables.add(new SourceTable(key, fromItemIndex, tuples));
-    }
-    
-    //XXX: Special case of ID column
-    for(SourceTable table : sourceTables)
-    {
-      for(ZNRecord record : table.tuples)
-      {
-        if(!record.getSimpleFields().containsKey("id"))
-        {
-          record.setSimpleField("id", record.getId());
-        }
-      }
-    }
-    
-    return sourceTables;
-  }
-
-  private List<ZNRecord> getExplodedTable(String function,
-                                          String arg,
-                                          ZNRecordTupleReader tupleReader) throws Exception
-  {
-    List<String> args = Arrays.asList(arg.split("_____"));
-    String originalTable = args.get(0);
-    List<String> columns = (args.size() > 1 ? args.subList(1, args.size()) : null);
-    if(function.equalsIgnoreCase("explodeList"))
-    {
-      return explodeListFields(originalTable, columns, tupleReader);
-    }
-    else if(function.equalsIgnoreCase("explodeMap"))
-    {
-      return explodeMapFields(originalTable, columns, tupleReader);
-    }
-    
-    throw new IllegalArgumentException(function + " is not supported");
-  }
-
-  private List<ZNRecord> explodeMapFields(String tableName,
-                                          Collection<String> mapColumns,
-                                          ZNRecordTupleReader tupleReader) throws Exception
-  {
-    List<ZNRecord> table = tupleReader.get(tableName);
-    List<ZNRecord> ret = new ArrayList<ZNRecord>();
-    for(ZNRecord record : table)
-    {
-      Collection<String> cols = (mapColumns == null ? record.getMapFields().keySet() : mapColumns);
- 
-      for(String mapCol : cols) //for each map field
-      {
-        if(record.getMapFields().containsKey(mapCol))
-        {
-          ZNRecord newRecord = new ZNRecord("");
-          newRecord.getSimpleFields().putAll(record.getSimpleFields());
-          newRecord.setSimpleField("mapField", mapCol);
-          Map<String, String> mapField = record.getMapField(mapCol);
-          for(String key : mapField.keySet())
-          {
-            newRecord.setSimpleField(key, mapField.get(key));
-          }
-          ret.add(newRecord);
-        }
-      }
-    }
-    
-    return ret;
-  }
-
-  private List<ZNRecord> explodeListFields(String tableName,
-                                           Collection<String> listColumns,
-                                           ZNRecordTupleReader tupleReader) throws Exception
-  {
-    List<ZNRecord> table = tupleReader.get(tableName);
-    List<ZNRecord> ret = new ArrayList<ZNRecord>();
-    for(ZNRecord record : table)
-    {
-      Collection<String> cols = (listColumns == null ? record.getListFields().keySet() : listColumns);
-      for(String listCol : cols) //for each list field
-      {
-        if(record.getListFields().containsKey(listCol))
-        {
-          List<String> listField = record.getListField(listCol);
-          for(int listIndex = 0; listIndex < listField.size(); listIndex++)
-          {
-            String val = listField.get(listIndex);
-            ZNRecord newRecord = new ZNRecord("");
-            newRecord.getSimpleFields().putAll(record.getSimpleFields());
-            newRecord.setSimpleField("listField", listCol);
-            newRecord.setSimpleField("listIndex", ""+listIndex);
-            newRecord.setSimpleField("listVal", val);
-            ret.add(newRecord);
-          }
-        }
-      }
-    }
-    return ret;
-  }
-}
-
-class ZNRecordSet extends ZNRecord
-{
-  private HashMap<String, ZNRecord> _records = new HashMap<String, ZNRecord>();
-
-  public ZNRecordSet(String id)
-  {
-    super(id);
-    // TODO Auto-generated constructor stub
-  }
-  
-  public void addRecord(String tableName, ZNRecord record)
-  {
-    _records.put(tableName, record);
-  }
-  
-  @Override
-  public String getSimpleField(String key)
-  {
-    if(key.contains("."))
-    {
-      String tmp[] = key.split("\\.");
-      if(_records.containsKey(tmp[0]))
-      {
-        return _records.get(tmp[0]).getSimpleField(tmp[1]);
-      }
-    }
-    else
-    {
-      for(ZNRecord record : _records.values())
-      {
-        if(record.getSimpleFields().containsKey(key))
-        {
-          return record.getSimpleField(key);
-        }
-      }
-    }
-    
-    return null;
-  }
-  
-  @Override
-  public void setSimpleField(String key, String val)
-  {
-    if(key.contains("."))
-    {
-      String tmp[] = key.split("\\.");
-      if(_records.containsKey(tmp[0]))
-      {
-        _records.get(tmp[0]).setSimpleField(tmp[1], val);
-      }
-      else
-      {
-        ZNRecord record = new ZNRecord("");
-        record.setSimpleField(tmp[1], val);
-        addRecord(tmp[0], record);
-      }
-    }
-    else
-    {
-      if(_records.size() == 0)
-      {
-        ZNRecord record = new ZNRecord("");
-        record.setSimpleField(key, val);
-        addRecord("xxx_random_" + System.currentTimeMillis(), record);
-      }
-      else
-      {
-        _records.get(_records.keySet().iterator().next()).setSimpleField(key, val);
-      }
-    }
-  }
-  
-  @Override
-  public String toString()
-  {
-    return "@" + _records.toString() + "@";
-  }
-}
-
-class SourceTableList implements Iterable<SourceTable>
-{
-  Map<String, SourceTable> _nameMap = new HashMap<String, SourceTable>();
-  Map<Integer, SourceTable> _indexMap = new HashMap<Integer, SourceTable>();
-  
-  public void add(SourceTable table)
-  {
-    _nameMap.put(table.name, table);
-    _indexMap.put(table.fromItemIndex, table);
-  }
-  
-  public int size()
-  {
-    return _nameMap.size();
-  }
-  
-  public SourceTable get(int index)
-  {
-    if(index >= _indexMap.size())
-    {
-      throw new IndexOutOfBoundsException();
-    }
-    return _indexMap.get(index);
-  }
-  
-  public SourceTable get(String name)
-  {
-    return _nameMap.get(name);
-  }
-  
-  public boolean containsKey(String name)
-  {
-    return _nameMap.containsKey(name);
-  }
-  
-  public Iterator<SourceTable> iterator()
-  {
-    return _nameMap.values().iterator();
-  }
-  
-  @Override
-  public String toString()
-  {
-    return _nameMap.toString();
-  }
-}
-
-class SourceTable
-{
-  String name;
-  int fromItemIndex;
-  List<ZNRecord> tuples;
-  
-  public SourceTable(String name, int fromItemIndex, List<ZNRecord> tuples)
-  {
-    this.name = name;
-    this.fromItemIndex = fromItemIndex;
-    this.tuples = tuples;
-  }
-  
-  @Override
-  public String toString()
-  {
-    return name + "->" + tuples;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/josql/ZNRecordRow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/josql/ZNRecordRow.java b/helix-core/src/main/java/com/linkedin/helix/josql/ZNRecordRow.java
deleted file mode 100644
index 08a3022..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/josql/ZNRecordRow.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.josql;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import com.linkedin.helix.ZNRecord;
-
-/**
- * A Normalized form of ZNRecord 
- * */
-public class ZNRecordRow
-{
-  // "Field names" in the flattened ZNRecord
-  public static final String SIMPLE_KEY = "simpleKey";
-  public static final String SIMPLE_VALUE = "simpleValue";
-  
-  public static final String LIST_KEY = "listKey";
-  public static final String LIST_VALUE = "listValue";
-  public static final String LIST_VALUE_INDEX = "listValueIndex";
-  
-  public static final String MAP_KEY = "mapKey";
-  public static final String MAP_SUBKEY = "mapSubKey";
-  public static final String MAP_VALUE = "mapValue";
-  public static final String ZNRECORD_ID = "recordId";
-  // ZNRECORD path ? 
-  
-  
-  final Map<String, String> _rowDataMap = new HashMap<String, String>();
-  
-  public ZNRecordRow()
-  {
-    _rowDataMap.put(SIMPLE_KEY, "");
-    _rowDataMap.put(SIMPLE_VALUE, "");
-    _rowDataMap.put(LIST_KEY, "");
-    _rowDataMap.put(LIST_VALUE, "");
-    _rowDataMap.put(LIST_VALUE_INDEX, "");
-    _rowDataMap.put(MAP_KEY, "");
-    _rowDataMap.put(MAP_SUBKEY, "");
-    _rowDataMap.put(MAP_VALUE, "");
-    _rowDataMap.put(ZNRECORD_ID, "");
-  }
-  
-  public String getField(String rowField)
-  {
-    return _rowDataMap.get(rowField);
-  }
-  
-  public void putField(String fieldName, String fieldValue)
-  {
-    _rowDataMap.put(fieldName, fieldValue);
-  }
-  public String getListValueIndex()
-  {
-    return getField(LIST_VALUE_INDEX);
-  }
-  public String getSimpleKey()
-  {
-    return getField(SIMPLE_KEY);
-  }
-  
-  public String getSimpleValue()
-  {
-    return getField(SIMPLE_VALUE);
-  }
-  
-  public String getListKey()
-  {
-    return getField(LIST_KEY);
-  }
-  
-  public String getListValue()
-  {
-    return getField(LIST_VALUE);
-  }
-  
-  public String getMapKey()
-  {
-    return getField(MAP_KEY);
-  }
-  
-  public String getMapSubKey()
-  {
-    return getField(MAP_SUBKEY);
-  }
-  
-  public String getMapValue()
-  {
-    return getField(MAP_VALUE);
-  }
-  
-  public String getRecordId()
-  {
-    return getField(ZNRECORD_ID);
-  }
-  
-  /* Josql function handlers */
-  public static String getField(ZNRecordRow row, String rowField)
-  {
-    return row.getField(rowField);
-  }
-  
-  public static List<ZNRecordRow> convertSimpleFields(ZNRecord record)
-  {
-    List<ZNRecordRow> result = new ArrayList<ZNRecordRow>();
-    for(String key : record.getSimpleFields().keySet())
-    {
-      ZNRecordRow row = new ZNRecordRow();
-      row.putField(ZNRECORD_ID, record.getId());
-      row.putField(SIMPLE_KEY, key);
-      row.putField(SIMPLE_VALUE, record.getSimpleField(key));
-      result.add(row);
-    }
-    return result;
-  }
-  
-  public static List<ZNRecordRow> convertListFields(ZNRecord record)
-  {
-    List<ZNRecordRow> result = new ArrayList<ZNRecordRow>();
-    for(String key : record.getListFields().keySet())
-    {
-      int order = 0;
-      for(String value : record.getListField(key))
-      {
-        ZNRecordRow row = new ZNRecordRow();
-        row.putField(ZNRECORD_ID, record.getId());
-        row.putField(LIST_KEY, key);
-        row.putField(LIST_VALUE, record.getSimpleField(key));
-        row.putField(LIST_VALUE_INDEX, ""+order);
-        order++;
-        result.add(row);
-      }
-    }
-    return result;
-  }
-  
-  public static List<ZNRecordRow> convertMapFields(ZNRecord record)
-  {
-    List<ZNRecordRow> result = new ArrayList<ZNRecordRow>();
-    for(String key0 : record.getMapFields().keySet())
-    {
-      for(String key1 : record.getMapField(key0).keySet())
-      {
-        ZNRecordRow row = new ZNRecordRow();
-        row.putField(ZNRECORD_ID, record.getId());
-        row.putField(MAP_KEY, key0);
-        row.putField(MAP_SUBKEY, key1);
-        row.putField(MAP_VALUE, record.getMapField(key0).get(key1));
-        result.add(row);
-      }
-    }
-    return result;
-  }
-  
-  public static List<ZNRecordRow> flatten(ZNRecord record)
-  {
-    List<ZNRecordRow> result = convertMapFields(record);
-    result.addAll(convertListFields(record));
-    result.addAll(convertSimpleFields(record));
-    return result;
-  }
-  
-  public static List<ZNRecordRow> flatten(Collection<ZNRecord> recordList)
-  {
-    List<ZNRecordRow> result = new ArrayList<ZNRecordRow>();
-    for(ZNRecord record : recordList)
-    {
-      result.addAll(flatten(record));
-    }
-    return result;
-  }
-  
-  public static List<ZNRecordRow> getRowListFromMap(Map<String, List<ZNRecordRow>> rowMap, String key)
-  {
-    return rowMap.get(key);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/josql/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/josql/package-info.java b/helix-core/src/main/java/com/linkedin/helix/josql/package-info.java
deleted file mode 100644
index 7b4e85d..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/josql/package-info.java
+++ /dev/null
@@ -1,5 +0,0 @@
-/**
- * Jsql processor for Helix
- * 
- */
-package com.linkedin.helix.josql;
\ No newline at end of file