You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2011/12/04 22:09:35 UTC

svn commit: r1210225 - in /incubator/lcf/branches/CONNECTORS-286/warthog/src: main/java/org/apache/warthog/common/ main/java/org/apache/warthog/interfaces/ test/java/org/apache/warthog/tests/

Author: kwright
Date: Sun Dec  4 21:09:35 2011
New Revision: 1210225

URL: http://svn.apache.org/viewvc?rev=1210225&view=rev
Log:
Work out aggregation, and begin work on joins

Added:
    incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/ColumnBreakOnChange.java   (with props)
    incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/ColumnCount.java   (with props)
    incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/ColumnMaxLong.java   (with props)
    incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/ColumnMinLong.java   (with props)
    incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/ColumnSumLong.java   (with props)
    incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/AggregateAccessor.java   (with props)
    incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/WHColumnCalculator.java   (with props)
Modified:
    incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/AggregateRelationship.java
    incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/JoinRelationship.java
    incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/WHColumnDescription.java
    incubator/lcf/branches/CONNECTORS-286/warthog/src/test/java/org/apache/warthog/tests/SanityTest.java

Added: incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/ColumnBreakOnChange.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/ColumnBreakOnChange.java?rev=1210225&view=auto
==============================================================================
--- incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/ColumnBreakOnChange.java (added)
+++ incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/ColumnBreakOnChange.java Sun Dec  4 21:09:35 2011
@@ -0,0 +1,116 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.warthog.common;
+
+import org.apache.warthog.interfaces.*;
+
+/** This class is used to create a column upon which presorted aggregation works.
+* By signaling when the value changes from row to row, this column definition creates
+* "buckets". which can be used for both DISTINCT and GROUP BY functionality,
+* as long as the rows returned by the underlying accessor are ordered by the column
+* being selected.
+*/
+public class ColumnBreakOnChange implements WHColumnDescription
+{
+  protected String baseColumnName;
+  protected String returnColumnName;
+  
+  /** Constructor */
+  public ColumnBreakOnChange(String baseColumnName, String returnColumnName)
+  {
+    this.baseColumnName = baseColumnName;
+    this.returnColumnName = returnColumnName;
+  }
+  
+  /** Get the name of the return column */
+  public String getReturnColumnName()
+    throws WHException
+  {
+    return returnColumnName;
+  }
+  
+  /** Create a return column calculator for this column.
+  */
+  public WHColumnCalculator createCalculator()
+    throws WHException
+  {
+    return new BreakCalculator(baseColumnName);
+  }
+  
+  protected static class BreakCalculator implements WHColumnCalculator
+  {
+    protected String baseColumnName;
+    
+    protected WHValue previousValue = null;
+    
+    public BreakCalculator(String baseColumnName)
+    {
+      this.baseColumnName = baseColumnName;
+    }
+
+    /** Check whether a row can be added to to the column calculator.
+    * Does NOT actually change the result of the calculator!
+    *@return true if the row can be added and false if a tally should be done immediately.
+    */
+    public boolean canProcessRowValue(WHAccessor accessor)
+      throws WHException
+    {
+      if (previousValue == null)
+        return true;
+      WHValue newValue = accessor.getValue(baseColumnName);
+      if (newValue.equals(previousValue))
+        return true;
+      return false;
+    }
+
+    /** Check whether this calculator is capable of generating a final summary row value.
+    *@return true if it is capable.  tally() will only be called at the end for the summary
+    *     row if all columns are capable of generating a summary value.
+    */
+    public boolean canGenerateSummaryValue()
+      throws WHException
+    {
+      return (previousValue != null);
+    }
+
+    /** Feed a row to the column calculator.
+    */
+    public void processRowValue(WHAccessor accessor)
+      throws WHException
+    {
+      previousValue = accessor.getValue(baseColumnName);
+    }
+    
+    /** Tally a final result, obtaining whatever current aggregate value there is.
+    * Also resets the calculator in preparation for another set of rows.
+    */
+    public WHValue tally()
+      throws WHException
+    {
+      WHValue rval = previousValue;
+      previousValue = null;
+      return rval;
+    }
+
+  }
+    
+}
+  
+  
\ No newline at end of file

Propchange: incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/ColumnBreakOnChange.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/ColumnBreakOnChange.java
------------------------------------------------------------------------------
    svn:keywords = Id

Added: incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/ColumnCount.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/ColumnCount.java?rev=1210225&view=auto
==============================================================================
--- incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/ColumnCount.java (added)
+++ incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/ColumnCount.java Sun Dec  4 21:09:35 2011
@@ -0,0 +1,102 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.warthog.common;
+
+import org.apache.warthog.interfaces.*;
+
+/** This class is used to count the rows found.
+*/
+public class ColumnCount implements WHColumnDescription
+{
+  protected String returnColumnName;
+  
+  /** Constructor */
+  public ColumnCount(String returnColumnName)
+  {
+    this.returnColumnName = returnColumnName;
+  }
+  
+  /** Get the name of the return column */
+  public String getReturnColumnName()
+    throws WHException
+  {
+    return returnColumnName;
+  }
+  
+  /** Create a return column calculator for this column.
+  */
+  public WHColumnCalculator createCalculator()
+    throws WHException
+  {
+    return new CountCalculator();
+  }
+  
+  protected static class CountCalculator implements WHColumnCalculator
+  {
+    protected long counter = 0L;
+    
+    public CountCalculator()
+    {
+    }
+
+    /** Check whether a row can be added to to the column calculator.
+    * Does NOT actually change the result of the calculator!
+    *@return true if the row can be added and false if a tally should be done immediately.
+    */
+    public boolean canProcessRowValue(WHAccessor accessor)
+      throws WHException
+    {
+      return true;
+    }
+
+    /** Check whether this calculator is capable of generating a final summary row value.
+    *@return true if it is capable.  tally() will only be called at the end for the summary
+    *     row if all columns are capable of generating a summary value.
+    */
+    public boolean canGenerateSummaryValue()
+      throws WHException
+    {
+      return true;
+    }
+
+    /** Feed a row to the column calculator.
+    */
+    public void processRowValue(WHAccessor accessor)
+      throws WHException
+    {
+      counter++;
+    }
+    
+    /** Tally a final result, obtaining whatever current aggregate value there is.
+    * Also resets the calculator in preparation for another set of rows.
+    */
+    public WHValue tally()
+      throws WHException
+    {
+      WHValue rval = new LongValue(counter);
+      counter = 0L;
+      return rval;
+    }
+
+  }
+    
+}
+  
+  
\ No newline at end of file

Propchange: incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/ColumnCount.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/ColumnCount.java
------------------------------------------------------------------------------
    svn:keywords = Id

Added: incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/ColumnMaxLong.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/ColumnMaxLong.java?rev=1210225&view=auto
==============================================================================
--- incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/ColumnMaxLong.java (added)
+++ incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/ColumnMaxLong.java Sun Dec  4 21:09:35 2011
@@ -0,0 +1,120 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.warthog.common;
+
+import org.apache.warthog.interfaces.*;
+
+/** This class is used to create a column which finds the max of long values found in the specified
+* base column.
+*/
+public class ColumnMaxLong implements WHColumnDescription
+{
+  protected String baseColumnName;
+  protected String returnColumnName;
+  
+  /** Constructor */
+  public ColumnMaxLong(String baseColumnName, String returnColumnName)
+  {
+    this.baseColumnName = baseColumnName;
+    this.returnColumnName = returnColumnName;
+  }
+  
+  /** Get the name of the return column */
+  public String getReturnColumnName()
+    throws WHException
+  {
+    return returnColumnName;
+  }
+  
+  /** Create a return column calculator for this column.
+  */
+  public WHColumnCalculator createCalculator()
+    throws WHException
+  {
+    return new MaxLongCalculator(baseColumnName);
+  }
+  
+  protected static class MaxLongCalculator implements WHColumnCalculator
+  {
+    protected String baseColumnName;
+    
+    protected boolean firstValue = true;
+    protected long max;
+    
+    public MaxLongCalculator(String baseColumnName)
+    {
+      this.baseColumnName = baseColumnName;
+    }
+
+    /** Check whether a row can be added to to the column calculator.
+    * Does NOT actually change the result of the calculator!
+    *@return true if the row can be added and false if a tally should be done immediately.
+    */
+    public boolean canProcessRowValue(WHAccessor accessor)
+      throws WHException
+    {
+      // This calculator can always accept new values
+      return true;
+    }
+
+    /** Check whether this calculator is capable of generating a final summary row value.
+    *@return true if it is capable.  tally() will only be called at the end for the summary
+    *     row if all columns are capable of generating a summary value.
+    */
+    public boolean canGenerateSummaryValue()
+      throws WHException
+    {
+      // This calculator can only generate a summary value if there's at least one row
+      return !firstValue;
+    }
+
+    /** Feed a row to the column calculator.
+    */
+    public void processRowValue(WHAccessor accessor)
+      throws WHException
+    {
+      WHValue value = accessor.getValue(baseColumnName);
+      if (value != null)
+      {
+        if (!(value instanceof LongValue))
+          throw new WHException("ColumnMaxLong can only be used with values of LongValue type");
+	long val = ((LongValue)value).getValue();
+	if (firstValue || val > max)
+          max = val;
+        firstValue = false;
+      }
+    }
+    
+    /** Tally a final result, obtaining whatever current aggregate value there is.
+    * Also resets the calculator in preparation for another set of rows.
+    */
+    public WHValue tally()
+      throws WHException
+    {
+      WHValue rval = new LongValue(max);
+      firstValue = true;
+      return rval;
+    }
+
+  }
+    
+}
+  
+  
\ No newline at end of file

Propchange: incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/ColumnMaxLong.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/ColumnMaxLong.java
------------------------------------------------------------------------------
    svn:keywords = Id

Added: incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/ColumnMinLong.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/ColumnMinLong.java?rev=1210225&view=auto
==============================================================================
--- incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/ColumnMinLong.java (added)
+++ incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/ColumnMinLong.java Sun Dec  4 21:09:35 2011
@@ -0,0 +1,120 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.warthog.common;
+
+import org.apache.warthog.interfaces.*;
+
+/** This class is used to create a column which finds minimum of long values found in the specified
+* base column.
+*/
+public class ColumnMinLong implements WHColumnDescription
+{
+  protected String baseColumnName;
+  protected String returnColumnName;
+  
+  /** Constructor */
+  public ColumnMinLong(String baseColumnName, String returnColumnName)
+  {
+    this.baseColumnName = baseColumnName;
+    this.returnColumnName = returnColumnName;
+  }
+  
+  /** Get the name of the return column */
+  public String getReturnColumnName()
+    throws WHException
+  {
+    return returnColumnName;
+  }
+  
+  /** Create a return column calculator for this column.
+  */
+  public WHColumnCalculator createCalculator()
+    throws WHException
+  {
+    return new MinLongCalculator(baseColumnName);
+  }
+  
+  protected static class MinLongCalculator implements WHColumnCalculator
+  {
+    protected String baseColumnName;
+    
+    protected boolean firstValue = true;
+    protected long min;
+    
+    public MinLongCalculator(String baseColumnName)
+    {
+      this.baseColumnName = baseColumnName;
+    }
+
+    /** Check whether a row can be added to to the column calculator.
+    * Does NOT actually change the result of the calculator!
+    *@return true if the row can be added and false if a tally should be done immediately.
+    */
+    public boolean canProcessRowValue(WHAccessor accessor)
+      throws WHException
+    {
+      // This calculator can always accept new values
+      return true;
+    }
+
+    /** Check whether this calculator is capable of generating a final summary row value.
+    *@return true if it is capable.  tally() will only be called at the end for the summary
+    *     row if all columns are capable of generating a summary value.
+    */
+    public boolean canGenerateSummaryValue()
+      throws WHException
+    {
+      // This calculator can only generate a summary value if there's at least one row
+      return !firstValue;
+    }
+
+    /** Feed a row to the column calculator.
+    */
+    public void processRowValue(WHAccessor accessor)
+      throws WHException
+    {
+      WHValue value = accessor.getValue(baseColumnName);
+      if (value != null)
+      {
+        if (!(value instanceof LongValue))
+          throw new WHException("ColumnSumLong can only be used with values of LongValue type");
+	long val = ((LongValue)value).getValue();
+	if (firstValue || val < min)
+          min = val;
+        firstValue = false;
+      }
+    }
+    
+    /** Tally a final result, obtaining whatever current aggregate value there is.
+    * Also resets the calculator in preparation for another set of rows.
+    */
+    public WHValue tally()
+      throws WHException
+    {
+      WHValue rval = new LongValue(min);
+      firstValue = true;
+      return rval;
+    }
+
+  }
+    
+}
+  
+  
\ No newline at end of file

Propchange: incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/ColumnMinLong.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/ColumnMinLong.java
------------------------------------------------------------------------------
    svn:keywords = Id

Added: incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/ColumnSumLong.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/ColumnSumLong.java?rev=1210225&view=auto
==============================================================================
--- incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/ColumnSumLong.java (added)
+++ incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/ColumnSumLong.java Sun Dec  4 21:09:35 2011
@@ -0,0 +1,116 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.warthog.common;
+
+import org.apache.warthog.interfaces.*;
+
+/** This class is used to create a column which sums long values found in the specified
+* base column.
+*/
+public class ColumnSumLong implements WHColumnDescription
+{
+  protected String baseColumnName;
+  protected String returnColumnName;
+  
+  /** Constructor */
+  public ColumnSumLong(String baseColumnName, String returnColumnName)
+  {
+    this.baseColumnName = baseColumnName;
+    this.returnColumnName = returnColumnName;
+  }
+  
+  /** Get the name of the return column */
+  public String getReturnColumnName()
+    throws WHException
+  {
+    return returnColumnName;
+  }
+  
+  /** Create a return column calculator for this column.
+  */
+  public WHColumnCalculator createCalculator()
+    throws WHException
+  {
+    return new SumLongCalculator(baseColumnName);
+  }
+  
+  protected static class SumLongCalculator implements WHColumnCalculator
+  {
+    protected String baseColumnName;
+    
+    protected long sum = 0L;
+    
+    public SumLongCalculator(String baseColumnName)
+    {
+      this.baseColumnName = baseColumnName;
+    }
+
+    /** Check whether a row can be added to to the column calculator.
+    * Does NOT actually change the result of the calculator!
+    *@return true if the row can be added and false if a tally should be done immediately.
+    */
+    public boolean canProcessRowValue(WHAccessor accessor)
+      throws WHException
+    {
+      // This calculator can always accept new values
+      return true;
+    }
+
+    /** Check whether this calculator is capable of generating a final summary row value.
+    *@return true if it is capable.  tally() will only be called at the end for the summary
+    *     row if all columns are capable of generating a summary value.
+    */
+    public boolean canGenerateSummaryValue()
+      throws WHException
+    {
+      // This calculator can always generate a summary value
+      return true;
+    }
+
+    /** Feed a row to the column calculator.
+    */
+    public void processRowValue(WHAccessor accessor)
+      throws WHException
+    {
+      WHValue value = accessor.getValue(baseColumnName);
+      if (value != null)
+      {
+        if (!(value instanceof LongValue))
+          throw new WHException("ColumnSumLong can only be used with values of LongValue type");
+        sum += ((LongValue)value).getValue();
+      }
+    }
+    
+    /** Tally a final result, obtaining whatever current aggregate value there is.
+    * Also resets the calculator in preparation for another set of rows.
+    */
+    public WHValue tally()
+      throws WHException
+    {
+      WHValue rval = new LongValue(sum);
+      sum = 0L;
+      return rval;
+    }
+
+  }
+    
+}
+  
+  
\ No newline at end of file

Propchange: incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/ColumnSumLong.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/ColumnSumLong.java
------------------------------------------------------------------------------
    svn:keywords = Id

Added: incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/AggregateAccessor.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/AggregateAccessor.java?rev=1210225&view=auto
==============================================================================
--- incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/AggregateAccessor.java (added)
+++ incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/AggregateAccessor.java Sun Dec  4 21:09:35 2011
@@ -0,0 +1,170 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.warthog.interfaces;
+
+import org.apache.warthog.common.*;
+import java.util.*;
+
+/** Accessor class that returns aggregate rows.
+*/
+public class AggregateAccessor implements WHAccessor
+{
+  protected WHAccessor baseAccessor;
+  protected WHColumnCalculator[] returnColumnCalculators;
+  protected String[] returnColumnNames;
+  protected LongValue currentRowID;
+  protected long nextRowNumber = 0L;
+  protected boolean finalRowRendered = false;
+  
+  protected Map<String,WHValue> currentRowValues = new HashMap<String,WHValue>();
+    
+  public AggregateAccessor(WHAccessor baseAccessor, WHColumnCalculator[] returnColumnCalculators, String[] returnColumnNames)
+    throws WHException
+  {
+    this.baseAccessor = baseAccessor;
+    this.returnColumnCalculators = returnColumnCalculators;
+    this.returnColumnNames = returnColumnNames;
+    goToNextLegalRow();
+  }
+  
+  protected void goToNextLegalRow()
+    throws WHException
+  {
+    while (true)
+    {
+      WHRowID baseRowID = baseAccessor.getCurrentRowID();
+      if (baseRowID == null)
+      {
+        if (!finalRowRendered)
+        {
+          finalRowRendered = true;
+          // End!  Tally all rows; there may a row of data we need to return still.
+          boolean seenNoValue = false;
+          for (int i = 0 ; i < returnColumnCalculators.length ; i++)
+          {
+            if (returnColumnCalculators[i].canGenerateSummaryValue() == false)
+              seenNoValue = true;
+          }
+
+          if (!seenNoValue)
+          {
+            if (tallyCalculators());
+            {
+              currentRowID = new LongValue(nextRowNumber);
+              return;
+            }
+          }
+        }
+        // No final row
+        currentRowID = null;
+        currentRowValues = null;
+        return;
+      }
+      // Feed the row into all the calculators.  If a value emerges from any one of them, demand a value
+      // from ALL of them and set up a new row.
+      boolean seenValue = false;
+      for (int i = 0 ; i < returnColumnCalculators.length ; i++)
+      {
+        if (returnColumnCalculators[i].canProcessRowValue(baseAccessor) == false)
+          seenValue = true;
+      }
+      
+      if (seenValue)
+      {
+        if (tallyCalculators())
+        {
+          currentRowID = new LongValue(nextRowNumber);
+          nextRowNumber++;
+          // By returning, we guarantee that the same base accessor row will be
+          // used again, which is part of the contract.
+          return;
+        }
+      }
+      
+      // Add in the row to all column calculators
+      for (int i = 0 ; i < returnColumnCalculators.length ; i++)
+      {
+        returnColumnCalculators[i].processRowValue(baseAccessor);
+      }
+      
+      // Go process the next base accessor row
+      baseAccessor.advance();
+    }
+  }
+
+  protected boolean tallyCalculators()
+    throws WHException
+  {
+    WHValue[] calculatedValues = new WHValue[returnColumnCalculators.length];
+    boolean seenValue = false;
+    boolean seenNoValue = false;
+    for (int i = 0 ; i < returnColumnCalculators.length ; i++)
+    {
+      calculatedValues[i] = returnColumnCalculators[i].tally();
+      if (calculatedValues[i] != null)
+        seenValue = true;
+      else
+        seenNoValue = true;
+    }
+    if (seenValue && seenNoValue)
+      throw new WHException("Column calculators cannot tally consistently");
+    
+    if (seenValue)
+    {
+      currentRowValues.clear();
+      for (int i = 0 ; i < returnColumnCalculators.length ; i++)
+      {
+        currentRowValues.put(returnColumnNames[i],calculatedValues[i]);
+      }
+    }
+    return seenValue;
+  }
+  
+  /** Advance to the next row.
+  */
+  public void advance()
+    throws WHException
+  {
+    if (currentRowID != null)
+    {
+      goToNextLegalRow();
+    }
+  }
+  
+  /** Read the current relationship row ID,  Null will be returned if we are
+  * at the end of the sequence.
+  */
+  public WHRowID getCurrentRowID()
+    throws WHException
+  {
+    return currentRowID;
+  }
+	
+  /** Get the data for the current row and specified column. 
+  */
+  public WHValue getValue(String columnName)
+    throws WHException
+  {
+    if (currentRowID != null)
+      return currentRowValues.get(columnName);
+    throw new WHException("Can't read beyond end of accessor");
+  }
+
+}

Propchange: incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/AggregateAccessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/AggregateAccessor.java
------------------------------------------------------------------------------
    svn:keywords = Id

Modified: incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/AggregateRelationship.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/AggregateRelationship.java?rev=1210225&r1=1210224&r2=1210225&view=diff
==============================================================================
--- incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/AggregateRelationship.java (original)
+++ incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/AggregateRelationship.java Sun Dec  4 21:09:35 2011
@@ -25,25 +25,29 @@ package org.apache.warthog.interfaces;
 public class AggregateRelationship implements WHRelationship
 {
   protected WHRelationship relationshipToAggregate;
-  protected WHColumnDescription[] returnColumns;
-  protected String[] groupByColumns;
-
+  protected WHColumnDescription[] returnColumnCalculators;
+  protected String[] returnColumnNames;
+  
   /** Constructor */
   public AggregateRelationship(WHRelationship relationshipToAggregate,
-    WHColumnDescription[] returnColumns,
-    String[] groupByColumns)
+    WHColumnDescription[] returnColumnCalculators)
+    throws WHException
   {
     this.relationshipToAggregate = relationshipToAggregate;
-    this.returnColumns = returnColumns;
-    this.groupByColumns = groupByColumns;
+    this.returnColumnCalculators = returnColumnCalculators;
+    // Calculate the return column names
+    returnColumnNames = new String[returnColumnCalculators.length];
+    for (int i = 0 ; i < returnColumnCalculators.length ; i++)
+    {
+      returnColumnNames[i] = returnColumnCalculators[i].getReturnColumnName();
+    }
   }
   
   /** Get the column names represented by the relationship. */
   public String[] getColumnNames()
     throws WHException
   {
-    // MHL
-    return null;
+    return returnColumnNames;
   }
   
   /** Get the row accessor.  This determines
@@ -53,9 +57,12 @@ public class AggregateRelationship imple
   public WHAccessor buildAccessor()
     throws WHException
   {
-    // MHL
-    return null;
-    //return new FilterAccessor(relationshipToFilter.buildAccessor(),this,filterer);
+    WHColumnCalculator[] calculators = new WHColumnCalculator[returnColumnCalculators.length];
+    for (int i = 0 ; i < calculators.length ; i++)
+    {
+      calculators[i] = returnColumnCalculators[i].createCalculator();
+    }
+    return new AggregateAccessor(relationshipToAggregate.buildAccessor(),calculators,returnColumnNames);
   }
   
 }

Modified: incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/JoinRelationship.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/JoinRelationship.java?rev=1210225&r1=1210224&r2=1210225&view=diff
==============================================================================
--- incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/JoinRelationship.java (original)
+++ incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/JoinRelationship.java Sun Dec  4 21:09:35 2011
@@ -19,20 +19,51 @@
 
 package org.apache.warthog.interfaces;
 
-/** This relationship type represents the joining of n other relationships, plus a joining
-* criteria, yielding a result with columns that include the columns from all of the
-* child relationships.
+/** This relationship type represents the joining of a primary relationship and a
+* secondary relationship, plus a joining criteria, yielding a result with columns that
+* includes the columns from both.  To build a three-way join, just nest join
+* relationships appropriately.
 */
 public class JoinRelationship implements WHRelationship
 {
-  // MHL
-	
+  protected WHRelationship primeRelationship;
+  protected String[] secondaryColumns;
+  protected String[] secondaryReturnColumns;
+  protected WHAccessorBuilder secondaryAccessorBuilder;
+
+  protected String[] returnColumns;
+  
+  /** Constructor. */
+  public JoinRelationship(WHRelationship primeRelationship,
+    String[] secondaryColumns,
+    String[] secondaryReturnColumns,
+    WHAccessorBuilder secondaryAccessorBuilder)
+    throws WHException
+  {
+    if (secondaryColumns.length != secondaryReturnColumns.length)
+      throw new WHException("Secondary columns and return columns must be the same number");
+    this.primeRelationship = primeRelationship;
+    this.secondaryColumns = secondaryColumns;
+    this.secondaryReturnColumns = secondaryReturnColumns;
+    this.secondaryAccessorBuilder = secondaryAccessorBuilder;
+    String[] primaryColumns = primeRelationship.getColumnNames();
+    returnColumns = new String[primaryColumns.length + secondaryReturnColumns.length];
+    int position = 0;
+    for (int i = 0 ; i < primaryColumns.length ; i++)
+    {
+      returnColumns[position++] = primaryColumns[i];
+    }
+    for (int i = 0 ; i < secondaryReturnColumns.length ; i++)
+    {
+      returnColumns[position++] = secondaryReturnColumns[i];
+    }
+  }
+  
   /** Get the column names represented by the relationship. */
   public String[] getColumnNames()
     throws WHException
   {
-    // MHL
-    return null;
+    return returnColumns;
   }
 
   /** Get the row accessor.  This determines

Added: incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/WHColumnCalculator.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/WHColumnCalculator.java?rev=1210225&view=auto
==============================================================================
--- incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/WHColumnCalculator.java (added)
+++ incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/WHColumnCalculator.java Sun Dec  4 21:09:35 2011
@@ -0,0 +1,65 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.warthog.interfaces;
+
+/** Interface describing a set of return values for a result set.
+* This interface describes both row-based calculations AND
+* cross-row aggregations.  In order to do that, each instance receives
+* data from each row, and optionally coughs out a result value for the column.
+* Every time any column coughs out a value, the done() method will be called for all
+* the other columns as well, yielding a row of data.
+*
+* For some kinds of aggregation to work right it may require the accessor to order
+* results in a certain way.  This should be readily acheivable, however, with the use
+* of the proper indexes.
+* 
+* Note that the single-valued version of this abstraction is able to handle the
+* following well-known SQL constructs easily: SUM(), AVG(), MIN(), MAX(),
+* COUNT(), DISTINCT, and GROUP BY.
+*/
+public interface WHColumnCalculator
+{
+  /** Check whether a row can be added to to the column calculator.
+  * Does NOT actually change the result of the calculator!
+  *@param accessor is the current accessor.
+  *@return true if the row can be added, and false if it can't and a tally should be done immediately.
+  */
+  public boolean canProcessRowValue(WHAccessor accessor)
+    throws WHException;
+  
+  /** Check whether this calculator is capable of generating a final summary row value.
+  *@return true if it is capable.  tally() will only be called at the end for the summary
+  *     row if all columns are capable of generating a summary value.
+  */
+  public boolean canGenerateSummaryValue()
+    throws WHException;
+  
+  /** Feed a row to the column calculator.
+  */
+  public void processRowValue(WHAccessor accessor)
+    throws WHException;
+  
+  /** Tally a final result, obtaining whatever current aggregate value there is.
+  * Also resets the calculator in preparation for another set of rows.
+  */
+  public WHValue tally()
+    throws WHException;
+
+}
\ No newline at end of file

Propchange: incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/WHColumnCalculator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/WHColumnCalculator.java
------------------------------------------------------------------------------
    svn:keywords = Id

Modified: incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/WHColumnDescription.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/WHColumnDescription.java?rev=1210225&r1=1210224&r2=1210225&view=diff
==============================================================================
--- incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/WHColumnDescription.java (original)
+++ incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/WHColumnDescription.java Sun Dec  4 21:09:35 2011
@@ -20,11 +20,9 @@
 package org.apache.warthog.interfaces;
 
 /** Interface describing a return value for a result set.
-* This interface must describe both row-based calculations AND
-* cross-row aggregations.  In order to do that, each instance receives
-* data from each row, and optionally coughs out a result.  Aggregation
-* may therefore require results to ordered in a certain way for the aggregation to 
-* work appropriately.
+* This interface describes the name of the returned column,
+* and provides a factory for creating a matching column calculator,
+* to go along with each accessor.
 */
 public interface WHColumnDescription
 {
@@ -32,16 +30,8 @@ public interface WHColumnDescription
   public String getReturnColumnName()
     throws WHException;
   
-  /** Feed a row to the column calculator.
-  *@return a non-null value if there is a result, or null if no result yet.
+  /** Create a return column calculator for this column.
   */
-  public WHValue appendRowValue(WHRelationship relationship, long rowID)
+  public WHColumnCalculator createCalculator()
     throws WHException;
-  
-  /** End, obtaining the current aggregate value, if any.  Also resets in preparation for calculation
-  * of another aggregate value.
-  */
-  public WHValue done()
-    throws WHException;
-
-}
\ No newline at end of file
+}

Modified: incubator/lcf/branches/CONNECTORS-286/warthog/src/test/java/org/apache/warthog/tests/SanityTest.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/warthog/src/test/java/org/apache/warthog/tests/SanityTest.java?rev=1210225&r1=1210224&r2=1210225&view=diff
==============================================================================
--- incubator/lcf/branches/CONNECTORS-286/warthog/src/test/java/org/apache/warthog/tests/SanityTest.java (original)
+++ incubator/lcf/branches/CONNECTORS-286/warthog/src/test/java/org/apache/warthog/tests/SanityTest.java Sun Dec  4 21:09:35 2011
@@ -427,6 +427,146 @@ public class SanityTest
 
   }
   
+  @Test
+  public void aggregates()
+    throws Exception
+  {
+    WHTableStore ts = createInMemTableStore();
+    
+    // Build a table with two columns, one of which can be summed, and the other of which has repeating values.
+    ts.beginTransaction();
+    ts.createTable("testtable",new String[]{"colA","colB"});
+    ts.commitTransaction();
+    
+    ts.beginTransaction();
+    WHTable table = ts.lookupTable("testtable");
+    table.insertRow(new String[]{"colA","colB"},new WHValue[]{new LongValue(1L),new StringValue("hello")});
+    ts.commitTransaction();
+    
+    ts.beginTransaction();
+    table = ts.lookupTable("testtable");
+    table.insertRow(new String[]{"colA","colB"},new WHValue[]{new LongValue(4L),new StringValue("goodbye")});
+    ts.commitTransaction();
+
+    ts.beginTransaction();
+    table = ts.lookupTable("testtable");
+    table.insertRow(new String[]{"colA","colB"},new WHValue[]{new LongValue(3L),new StringValue("hello")});
+    ts.commitTransaction();
+
+    ts.beginTransaction();
+    table = ts.lookupTable("testtable");
+    table.insertRow(new String[]{"colA","colB"},new WHValue[]{new LongValue(2L),new StringValue("goodbye")});
+    ts.commitTransaction();
+
+    // Build an index on this table to establish ordering on ColB
+    ts.beginTransaction();
+    table = ts.lookupTable("testtable");
+    ts.createIndex("testindex",table,
+      new String[]{"colB"},
+      new String[]{"org.apache.warthog.common.StringComparatorAscending"},false);
+    ts.commitTransaction();
+
+    // Now use the index as a relationship (to provide ordering), and build an aggregate relationship based on it
+    ts.beginTransaction();
+    WHIndex index = ts.lookupIndex("testindex");
+    WHRelationship aggregateRelationship = new AggregateRelationship(index,new WHColumnDescription[]{
+      new ColumnSumLong("colA","sumA"),
+      new ColumnBreakOnChange("colB","uniqueB")});
+    // Grab the accessor and verify it is correct
+    WHAccessor accessor = aggregateRelationship.buildAccessor();
+    assertNotNull(accessor.getCurrentRowID());
+    WHValue value = accessor.getValue("sumA");
+    assertNotNull(value);
+    assertEquals("org.apache.warthog.common.LongValue",value.getClass().getName());
+    assertEquals(6L,((LongValue)value).getValue());
+    value = accessor.getValue("uniqueB");
+    assertNotNull(value);
+    assertEquals("org.apache.warthog.common.StringValue",value.getClass().getName());
+    assertEquals("goodbye",((StringValue)value).getValue());
+    accessor.advance();
+    assertNotNull(accessor.getCurrentRowID());
+    value = accessor.getValue("sumA");
+    assertNotNull(value);
+    assertEquals("org.apache.warthog.common.LongValue",value.getClass().getName());
+    assertEquals(4L,((LongValue)value).getValue());
+    value = accessor.getValue("uniqueB");
+    assertNotNull(value);
+    assertEquals("org.apache.warthog.common.StringValue",value.getClass().getName());
+    assertEquals("hello",((StringValue)value).getValue());
+    accessor.advance();
+    assertNull(accessor.getCurrentRowID());
+    ts.commitTransaction();
+
+    // Again use the index and build a different aggregate relationship based on it
+    ts.beginTransaction();
+    index = ts.lookupIndex("testindex");
+    aggregateRelationship = new AggregateRelationship(index,new WHColumnDescription[]{
+      new ColumnCount("countA"),
+      new ColumnBreakOnChange("colB","uniqueB")});
+    // Grab the accessor and verify it is correct
+    accessor = aggregateRelationship.buildAccessor();
+    assertNotNull(accessor.getCurrentRowID());
+    value = accessor.getValue("countA");
+    assertNotNull(value);
+    assertEquals("org.apache.warthog.common.LongValue",value.getClass().getName());
+    assertEquals(2L,((LongValue)value).getValue());
+    value = accessor.getValue("uniqueB");
+    assertNotNull(value);
+    assertEquals("org.apache.warthog.common.StringValue",value.getClass().getName());
+    assertEquals("goodbye",((StringValue)value).getValue());
+    accessor.advance();
+    assertNotNull(accessor.getCurrentRowID());
+    value = accessor.getValue("countA");
+    assertNotNull(value);
+    assertEquals("org.apache.warthog.common.LongValue",value.getClass().getName());
+    assertEquals(2L,((LongValue)value).getValue());
+    value = accessor.getValue("uniqueB");
+    assertNotNull(value);
+    assertEquals("org.apache.warthog.common.StringValue",value.getClass().getName());
+    assertEquals("hello",((StringValue)value).getValue());
+    accessor.advance();
+    assertNull(accessor.getCurrentRowID());
+    ts.commitTransaction();
+
+    // Use the table and calculate a row count
+    ts.beginTransaction();
+    table = ts.lookupTable("testtable");
+    aggregateRelationship = new AggregateRelationship(table,new WHColumnDescription[]{
+      new ColumnCount("count")});
+    // Grab the accessor and verify it is correct
+    accessor = aggregateRelationship.buildAccessor();
+    assertNotNull(accessor.getCurrentRowID());
+    value = accessor.getValue("count");
+    assertNotNull(value);
+    assertEquals("org.apache.warthog.common.LongValue",value.getClass().getName());
+    assertEquals(4L,((LongValue)value).getValue());
+    accessor.advance();
+    assertNull(accessor.getCurrentRowID());
+    ts.commitTransaction();
+
+    // Try out min and max
+    ts.beginTransaction();
+    table = ts.lookupTable("testtable");
+    aggregateRelationship = new AggregateRelationship(table,new WHColumnDescription[]{
+      new ColumnMinLong("colA","minA"),
+      new ColumnMaxLong("colA","maxA")});
+    // Grab the accessor and verify it is correct
+    accessor = aggregateRelationship.buildAccessor();
+    assertNotNull(accessor.getCurrentRowID());
+    value = accessor.getValue("minA");
+    assertNotNull(value);
+    assertEquals("org.apache.warthog.common.LongValue",value.getClass().getName());
+    assertEquals(1L,((LongValue)value).getValue());
+    value = accessor.getValue("maxA");
+    assertNotNull(value);
+    assertEquals("org.apache.warthog.common.LongValue",value.getClass().getName());
+    assertEquals(4L,((LongValue)value).getValue());
+    accessor.advance();
+    assertNull(accessor.getCurrentRowID());
+    ts.commitTransaction();
+
+  }
+  
   // Protected methods
   
   protected WHTableStore createInMemTableStore()