You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@vxquery.apache.org by pr...@apache.org on 2013/04/23 22:42:08 UTC

svn commit: r1471134 - in /incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery: compiler/rewriter/ compiler/rewriter/rules/ metadata/ runtime/functions/node/

Author: prestonc
Date: Tue Apr 23 20:42:08 2013
New Revision: 1471134

URL: http://svn.apache.org/r1471134
Log:
Added the data source scan ID for each collection.

Modified:
    incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/VXQueryOptimizationContext.java
    incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceCollectionRule.java
    incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java
    incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
    incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
    incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnCollectionScalarEvaluatorFactory.java
    incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnDocScalarEvaluatorFactory.java

Modified: incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/VXQueryOptimizationContext.java
URL: http://svn.apache.org/viewvc/incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/VXQueryOptimizationContext.java?rev=1471134&r1=1471133&r2=1471134&view=diff
==============================================================================
--- incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/VXQueryOptimizationContext.java (original)
+++ incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/VXQueryOptimizationContext.java Tue Apr 23 20:42:08 2013
@@ -20,6 +20,7 @@ import java.util.Map;
 import org.apache.vxquery.compiler.rewriter.rules.propagationpolicies.cardinality.Cardinality;
 import org.apache.vxquery.compiler.rewriter.rules.propagationpolicies.documentorder.DocumentOrder;
 import org.apache.vxquery.compiler.rewriter.rules.propagationpolicies.uniquenodes.UniqueNodes;
+import org.apache.vxquery.metadata.VXQueryCollectionDataSource;
 
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionEvalSizeComputer;
@@ -35,6 +36,8 @@ public class VXQueryOptimizationContext 
     private final Map<ILogicalOperator, HashMap<Integer, UniqueNodes>> uniqueNodesOperatorVariableMap = new HashMap<ILogicalOperator, HashMap<Integer, UniqueNodes>>();
     private final Map<ILogicalOperator, Cardinality> cardinalityOperatorMap = new HashMap<ILogicalOperator, Cardinality>();
 
+    private final Map<String, VXQueryCollectionDataSource> dataSourceScanMap = new HashMap<String, VXQueryCollectionDataSource>();
+
     public VXQueryOptimizationContext(int varCounter, int frameSize,
             IExpressionEvalSizeComputer expressionEvalSizeComputer,
             IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
@@ -44,6 +47,22 @@ public class VXQueryOptimizationContext 
                 expressionTypeComputer, nullableTypeComputer, physicalOptimizationConfig);
     }
 
+    public VXQueryCollectionDataSource getCollectionDataSourceMap(String collectionName) {
+        if (dataSourceScanMap.containsKey(collectionName)) {
+            return dataSourceScanMap.get(collectionName);
+        } else {
+            return null;
+        }
+    }
+    
+    public int getCollectionDataSourceMapSize() {
+        return dataSourceScanMap.size();
+    }
+    
+    public void putCollectionDataSourceMap(String collectionName, VXQueryCollectionDataSource ds) {
+        this.dataSourceScanMap.put(collectionName, ds);
+    }
+
     public Cardinality getCardinalityOperatorMap(ILogicalOperator op) {
         if (cardinalityOperatorMap.containsKey(op)) {
             return cardinalityOperatorMap.get(op);

Modified: incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceCollectionRule.java
URL: http://svn.apache.org/viewvc/incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceCollectionRule.java?rev=1471134&r1=1471133&r2=1471134&view=diff
==============================================================================
--- incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceCollectionRule.java (original)
+++ incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceCollectionRule.java Tue Apr 23 20:42:08 2013
@@ -25,6 +25,7 @@ import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.vxquery.compiler.algebricks.VXQueryConstantValue;
+import org.apache.vxquery.compiler.rewriter.VXQueryOptimizationContext;
 import org.apache.vxquery.datamodel.accessors.SequencePointable;
 import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
 import org.apache.vxquery.datamodel.values.ValueTag;
@@ -66,6 +67,7 @@ public class IntroduceCollectionRule imp
      */
     @Override
     public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        VXQueryOptimizationContext vxqueryContext = (VXQueryOptimizationContext) context;
         VXQueryConstantValue constantValue;
 
         // Check if assign is for fn:Collection.
@@ -150,7 +152,14 @@ public class IntroduceCollectionRule imp
         // Build the new operator and update the query plan.
         List<Object> types = new ArrayList<Object>();
         types.add(SequenceType.create(AnyItemType.INSTANCE, Quantifier.QUANT_STAR));
-        VXQueryCollectionDataSource ds = new VXQueryCollectionDataSource(collectionName, types.toArray());
+        VXQueryCollectionDataSource ds;
+        if (vxqueryContext.getCollectionDataSourceMap(collectionName) != null) {
+            ds = vxqueryContext.getCollectionDataSourceMap(collectionName);
+        } else {
+            int nextId = vxqueryContext.getCollectionDataSourceMapSize() + 1;
+            ds = new VXQueryCollectionDataSource(nextId, collectionName, types.toArray());
+            vxqueryContext.putCollectionDataSourceMap(collectionName, ds);
+        }
         DataSourceScanOperator opNew = new DataSourceScanOperator(assign.getVariables(), ds);
         opNew.getInputs().addAll(assign.getInputs());
         opRef.setValue(opNew);

Modified: incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java
URL: http://svn.apache.org/viewvc/incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java?rev=1471134&r1=1471133&r2=1471134&view=diff
==============================================================================
--- incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java (original)
+++ incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java Tue Apr 23 20:42:08 2013
@@ -31,13 +31,15 @@ import edu.uci.ics.hyracks.algebricks.co
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
 
 public class VXQueryCollectionDataSource implements IDataSource<String> {
+    private final int dataSourceId;
     private final String collectionName;
 
     private final Object[] types;
 
     private IDataSourcePropertiesProvider propProvider;
 
-    public VXQueryCollectionDataSource(String file, Object[] types) {
+    public VXQueryCollectionDataSource(int id, String file, Object[] types) {
+        this.dataSourceId = id;
         this.collectionName = file;
         this.types = types;
         final IPhysicalPropertiesVector vec = new StructuralPropertiesVector(new RandomPartitioningProperty(
@@ -50,6 +52,10 @@ public class VXQueryCollectionDataSource
         };
     }
 
+    public int getDataSourceId() {
+        return dataSourceId;
+    }
+
     @Override
     public String getId() {
         return collectionName;

Modified: incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java?rev=1471134&r1=1471133&r2=1471134&view=diff
==============================================================================
--- incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java (original)
+++ incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java Tue Apr 23 20:42:08 2013
@@ -40,11 +40,13 @@ import edu.uci.ics.hyracks.dataflow.std.
 
 public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
     private static final long serialVersionUID = 1L;
+    private int dataSourceId;
     private String collectionName;
 
     public VXQueryCollectionOperatorDescriptor(IOperatorDescriptorRegistry spec, String collectionName,
-            RecordDescriptor rDesc) {
+            int dataSourceId, RecordDescriptor rDesc) {
         super(spec, 1, 1);
+        this.dataSourceId = dataSourceId;
         this.collectionName = collectionName;
         recordDescriptors[0] = rDesc;
     }
@@ -61,8 +63,7 @@ public class VXQueryCollectionOperatorDe
         final InputSource in = new InputSource();
         final ArrayBackedValueStorage abvsFileNode = new ArrayBackedValueStorage();
         final int partitionId = ctx.getTaskAttemptId().getTaskId().getPartition();
-        // TODO Add the data source scan id.
-        final ITreeNodeIdProvider nodeIdProvider = new TreeNodeIdProvider((short) partitionId);
+        final ITreeNodeIdProvider nodeIdProvider = new TreeNodeIdProvider((short) partitionId, (short) dataSourceId);
 
         return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
             @Override

Modified: incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
URL: http://svn.apache.org/viewvc/incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java?rev=1471134&r1=1471133&r2=1471134&view=diff
==============================================================================
--- incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java (original)
+++ incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java Tue Apr 23 20:42:08 2013
@@ -56,8 +56,10 @@ public class VXQueryMetadataProvider imp
             throws AlgebricksException {
         VXQueryCollectionDataSource ds = (VXQueryCollectionDataSource) dataSource;
         RecordDescriptor rDesc = new RecordDescriptor(new ISerializerDeserializer[opSchema.getSize()]);
-        IOperatorDescriptor scanner = new VXQueryCollectionOperatorDescriptor(jobSpec, ds.getId(), rDesc);
+        IOperatorDescriptor scanner = new VXQueryCollectionOperatorDescriptor(jobSpec, ds.getId(),
+                ds.getDataSourceId(), rDesc);
 
+        // TODO review if locations needs to be updated for parallel processing.
         String[] locations = new String[1];
         locations[0] = "nc1";
         AlgebricksAbsolutePartitionConstraint constraint = new AlgebricksAbsolutePartitionConstraint(locations);

Modified: incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnCollectionScalarEvaluatorFactory.java
URL: http://svn.apache.org/viewvc/incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnCollectionScalarEvaluatorFactory.java?rev=1471134&r1=1471133&r2=1471134&view=diff
==============================================================================
--- incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnCollectionScalarEvaluatorFactory.java (original)
+++ incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnCollectionScalarEvaluatorFactory.java Tue Apr 23 20:42:08 2013
@@ -62,7 +62,6 @@ public class FnCollectionScalarEvaluator
         final ArrayBackedValueStorage abvsFileNode = new ArrayBackedValueStorage();
         final InputSource in = new InputSource();
         final int partition = ctx.getTaskAttemptId().getTaskId().getPartition();
-        // TODO Add the data source scan id.
         final ITreeNodeIdProvider nodeIdProvider = new TreeNodeIdProvider((short) partition);
 
         return new AbstractTaggedValueArgumentScalarEvaluator(args) {

Modified: incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnDocScalarEvaluatorFactory.java
URL: http://svn.apache.org/viewvc/incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnDocScalarEvaluatorFactory.java?rev=1471134&r1=1471133&r2=1471134&view=diff
==============================================================================
--- incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnDocScalarEvaluatorFactory.java (original)
+++ incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnDocScalarEvaluatorFactory.java Tue Apr 23 20:42:08 2013
@@ -57,7 +57,6 @@ public class FnDocScalarEvaluatorFactory
         final ByteBufferInputStream bbis = new ByteBufferInputStream();
         final DataInputStream di = new DataInputStream(bbis);
         final int partition = ctx.getTaskAttemptId().getTaskId().getPartition();
-        // TODO Add the data source scan id.
         final ITreeNodeIdProvider nodeIdProvider = new TreeNodeIdProvider((short) partition);
 
         return new AbstractTaggedValueArgumentScalarEvaluator(args) {