You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/02/01 09:29:40 UTC
[2/3] incubator-asterixdb-hyracks git commit: Add Support for Upsert
Operation
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
new file mode 100644
index 0000000..4702361
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class IndexInsertDeleteUpsertPOperator extends AbstractPhysicalOperator {
+
+ private final List<LogicalVariable> primaryKeys;
+ private final List<LogicalVariable> secondaryKeys;
+ private final ILogicalExpression filterExpr;
+ private final IDataSourceIndex<?, ?> dataSourceIndex;
+ private final List<LogicalVariable> additionalFilteringKeys;
+ private final List<LogicalVariable> prevSecondaryKeys;
+ private final LogicalVariable prevAdditionalFilteringKey;
+
+ public IndexInsertDeleteUpsertPOperator(List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+ List<LogicalVariable> additionalFilteringKeys, Mutable<ILogicalExpression> filterExpr,
+ IDataSourceIndex<?, ?> dataSourceIndex, List<LogicalVariable> prevSecondaryKeys,
+ LogicalVariable prevAdditionalFilteringKey) {
+ this.primaryKeys = primaryKeys;
+ this.secondaryKeys = secondaryKeys;
+ if (filterExpr != null) {
+ this.filterExpr = filterExpr.getValue();
+ } else {
+ this.filterExpr = null;
+ }
+ this.dataSourceIndex = dataSourceIndex;
+ this.additionalFilteringKeys = additionalFilteringKeys;
+ this.prevSecondaryKeys = prevSecondaryKeys;
+ this.prevAdditionalFilteringKey = prevAdditionalFilteringKey;
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.INDEX_INSERT_DELETE;
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent) {
+ List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
+ scanVariables.addAll(primaryKeys);
+ scanVariables.add(new LogicalVariable(-1));
+ IPhysicalPropertiesVector r = dataSourceIndex.getDataSource().getPropertiesProvider()
+ .computePropertiesVector(scanVariables);
+ r.getLocalProperties().clear();
+ IPhysicalPropertiesVector[] requirements = new IPhysicalPropertiesVector[1];
+ requirements[0] = r;
+ return new PhysicalRequirements(requirements, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ IndexInsertDeleteUpsertOperator insertDeleteUpsertOp = (IndexInsertDeleteUpsertOperator) op;
+ IMetadataProvider mp = context.getMetadataProvider();
+
+ JobSpecification spec = builder.getJobSpec();
+ RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
+ context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
+
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = null;
+ IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(insertDeleteUpsertOp);
+ if (insertDeleteUpsertOp.getOperation() == Kind.INSERT) {
+ runtimeAndConstraints = mp.getIndexInsertRuntime(dataSourceIndex, propagatedSchema, inputSchemas, typeEnv,
+ primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, inputDesc, context, spec, false);
+ } else if (insertDeleteUpsertOp.getOperation() == Kind.DELETE) {
+ runtimeAndConstraints = mp.getIndexDeleteRuntime(dataSourceIndex, propagatedSchema, inputSchemas, typeEnv,
+ primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, inputDesc, context, spec);
+ } else if (insertDeleteUpsertOp.getOperation() == Kind.UPSERT) {
+ runtimeAndConstraints = mp.getIndexUpsertRuntime(dataSourceIndex, propagatedSchema, inputSchemas, typeEnv,
+ primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, prevSecondaryKeys,
+ prevAdditionalFilteringKey, inputDesc, context, spec);
+ }
+ builder.contributeHyracksOperator(insertDeleteUpsertOp, runtimeAndConstraints.first);
+ builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
+ ILogicalOperator src = insertDeleteUpsertOp.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, insertDeleteUpsertOp, 0);
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return false;
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
+
+ public List<LogicalVariable> getPrevSecondaryKeys() {
+ return prevSecondaryKeys;
+ }
+
+ public LogicalVariable getPrevFilteringKeys() {
+ return prevAdditionalFilteringKey;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
deleted file mode 100644
index c774f44..0000000
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.core.algebra.operators.physical;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
-import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.job.JobSpecification;
-
-@SuppressWarnings("rawtypes")
-public class InsertDeletePOperator extends AbstractPhysicalOperator {
-
- private LogicalVariable payload;
- private List<LogicalVariable> keys;
- private IDataSource<?> dataSource;
- private final List<LogicalVariable> additionalFilteringKeys;
-
- public InsertDeletePOperator(LogicalVariable payload, List<LogicalVariable> keys,
- List<LogicalVariable> additionalFilteringKeys, IDataSource dataSource) {
- this.payload = payload;
- this.keys = keys;
- this.dataSource = dataSource;
- this.additionalFilteringKeys = additionalFilteringKeys;
- }
-
- @Override
- public PhysicalOperatorTag getOperatorTag() {
- return PhysicalOperatorTag.INSERT_DELETE;
- }
-
- @Override
- public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
- AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
- deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
- }
-
- @Override
- public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
- List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
- scanVariables.addAll(keys);
- scanVariables.add(new LogicalVariable(-1));
- IPhysicalPropertiesVector r = dataSource.getPropertiesProvider().computePropertiesVector(scanVariables);
- r.getLocalProperties().clear();
- IPhysicalPropertiesVector[] requirements = new IPhysicalPropertiesVector[1];
- requirements[0] = r;
- return new PhysicalRequirements(requirements, IPartitioningRequirementsCoordinator.NO_COORDINATION);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
- IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
- throws AlgebricksException {
- InsertDeleteOperator insertDeleteOp = (InsertDeleteOperator) op;
- IMetadataProvider mp = context.getMetadataProvider();
- IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(op);
- JobSpecification spec = builder.getJobSpec();
- RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
- context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
-
- Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = null;
- if (insertDeleteOp.getOperation() == Kind.INSERT) {
- runtimeAndConstraints = mp.getInsertRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
- additionalFilteringKeys, inputDesc, context, spec, false);
- } else {
- runtimeAndConstraints = mp.getDeleteRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
- additionalFilteringKeys, inputDesc, context, spec);
- }
-
- builder.contributeHyracksOperator(insertDeleteOp, runtimeAndConstraints.first);
- builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
- ILogicalOperator src = insertDeleteOp.getInputs().get(0).getValue();
- builder.contributeGraphEdge(src, 0, insertDeleteOp, 0);
- }
-
- @Override
- public boolean isMicroOperator() {
- return false;
- }
-
- @Override
- public boolean expensiveThanMaterialization() {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
new file mode 100644
index 0000000..d844f37
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.JobSpecification;
+
+@SuppressWarnings("rawtypes")
+public class InsertDeleteUpsertPOperator extends AbstractPhysicalOperator {
+
+ private LogicalVariable payload;
+ private List<LogicalVariable> keys;
+ private IDataSource<?> dataSource;
+ private final List<LogicalVariable> additionalFilteringKeys;
+ private final Kind operation;
+ private final LogicalVariable prevPayload;
+
+ public InsertDeleteUpsertPOperator(LogicalVariable payload, List<LogicalVariable> keys,
+ List<LogicalVariable> additionalFilteringKeys, IDataSource dataSource, Kind operation,
+ LogicalVariable prevPayload) {
+ this.payload = payload;
+ this.keys = keys;
+ this.dataSource = dataSource;
+ this.additionalFilteringKeys = additionalFilteringKeys;
+ this.operation = operation;
+ this.prevPayload = prevPayload;
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.INSERT_DELETE;
+ }
+
+ // Delivered Properties of this will be (Sorted on PK, Partitioned on PK)
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent) {
+ List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
+ scanVariables.addAll(keys);
+ scanVariables.add(new LogicalVariable(-1));
+ IPhysicalPropertiesVector r = dataSource.getPropertiesProvider().computePropertiesVector(scanVariables);
+ r.getLocalProperties().clear();
+ IPhysicalPropertiesVector[] requirements = new IPhysicalPropertiesVector[1];
+ requirements[0] = r;
+ return new PhysicalRequirements(requirements, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ InsertDeleteUpsertOperator insertDeleteOp = (InsertDeleteUpsertOperator) op;
+ IMetadataProvider mp = context.getMetadataProvider();
+ IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(op);
+ JobSpecification spec = builder.getJobSpec();
+ RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
+ context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
+
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = null;
+ if (operation == Kind.INSERT) {
+ runtimeAndConstraints = mp.getInsertRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
+ additionalFilteringKeys, inputDesc, context, spec, false);
+ } else if (operation == Kind.DELETE) {
+ runtimeAndConstraints = mp.getDeleteRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
+ additionalFilteringKeys, inputDesc, context, spec);
+ } else if (operation == Kind.UPSERT) {
+ runtimeAndConstraints = mp.getUpsertRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
+ additionalFilteringKeys, prevPayload, inputDesc, context, spec);
+ } else {
+ throw new AlgebricksException("Unsupported Operation " + operation);
+ }
+
+ builder.contributeHyracksOperator(insertDeleteOp, runtimeAndConstraints.first);
+ builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
+ ILogicalOperator src = insertDeleteOp.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, insertDeleteOp, 0);
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return false;
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
index e1983d2..98427fe 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
@@ -35,7 +35,7 @@ import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index 17f2285..cf0f1c2 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -38,10 +38,10 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOper
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -325,12 +325,16 @@ public class LogicalOperatorPrettyPrintVisitor implements ILogicalOperatorVisito
}
@Override
- public String visitInsertDeleteOperator(InsertDeleteOperator op, Integer indent) throws AlgebricksException {
+ public String visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Integer indent)
+ throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
- String header = op.getOperation() == Kind.INSERT ? "insert into " : "delete from ";
+ String header = getIndexOpString(op.getOperation());
addIndent(buffer, indent).append(header).append(op.getDataSource()).append(" from ")
.append(op.getPayloadExpression().getValue().accept(exprVisitor, indent)).append(" partitioned by ");
pprintExprList(op.getPrimaryKeyExpressions(), buffer, indent);
+ if (op.getOperation() == Kind.UPSERT) {
+ buffer.append(" out: ([" + op.getPrevRecordVar() + "] <-{record-before-upsert}) ");
+ }
if (op.isBulkload()) {
buffer.append(" [bulkload]");
}
@@ -338,19 +342,38 @@ public class LogicalOperatorPrettyPrintVisitor implements ILogicalOperatorVisito
}
@Override
- public String visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, Integer indent)
+ public String visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Integer indent)
throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
- String header = op.getOperation() == Kind.INSERT ? "insert into " : "delete from ";
+ String header = getIndexOpString(op.getOperation());
addIndent(buffer, indent).append(header).append(op.getIndexName()).append(" on ")
.append(op.getDataSourceIndex().getDataSource()).append(" from ");
- pprintExprList(op.getSecondaryKeyExpressions(), buffer, indent);
+ if (op.getOperation() == Kind.UPSERT) {
+ buffer.append(" replace:");
+ pprintExprList(op.getPrevSecondaryKeyExprs(), buffer, indent);
+ buffer.append(" with:");
+ pprintExprList(op.getSecondaryKeyExpressions(), buffer, indent);
+ } else {
+ pprintExprList(op.getSecondaryKeyExpressions(), buffer, indent);
+ }
if (op.isBulkload()) {
buffer.append(" [bulkload]");
}
return buffer.toString();
}
+ public String getIndexOpString(Kind opKind) {
+ switch (opKind) {
+ case DELETE:
+ return "delete from ";
+ case INSERT:
+ return "insert into ";
+ case UPSERT:
+ return "upsert into ";
+ }
+ return null;
+ }
+
@Override
public String visitTokenizeOperator(TokenizeOperator op, Integer indent) throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index 285812c..53c8b69 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -29,9 +29,9 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOper
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -113,9 +113,9 @@ public interface ILogicalOperatorVisitor<R, T> {
public R visitWriteResultOperator(WriteResultOperator op, T arg) throws AlgebricksException;
- public R visitInsertDeleteOperator(InsertDeleteOperator op, T tag) throws AlgebricksException;
+ public R visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, T tag) throws AlgebricksException;
- public R visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, T tag) throws AlgebricksException;
+ public R visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, T tag) throws AlgebricksException;
public R visitExternalDataLookupOperator(ExternalDataLookupOperator op, T arg) throws AlgebricksException;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/IQueryOperatorVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/IQueryOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/IQueryOperatorVisitor.java
index b5ce212..73628c1 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/IQueryOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/IQueryOperatorVisitor.java
@@ -19,8 +19,8 @@
package org.apache.hyracks.algebricks.core.algebra.visitors;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
@@ -43,12 +43,12 @@ public interface IQueryOperatorVisitor<R, T> extends ILogicalOperatorVisitor<R,
}
@Override
- public default R visitInsertDeleteOperator(InsertDeleteOperator op, T arg) {
+ public default R visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, T arg) {
throw new UnsupportedOperationException();
}
@Override
- public default R visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, T arg) {
+ public default R visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, T arg) {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
index 8f9ab9f..8bf3dbb 100644
--- a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
+++ b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
@@ -84,7 +84,7 @@ public class PigletMetadataProvider implements IMetadataProvider<String, String>
List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema,
IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig)
- throws AlgebricksException {
+ throws AlgebricksException {
PigletFileDataSource ds = (PigletFileDataSource) dataSource;
FileSplit[] fileSplits = ds.getFileSplits();
@@ -141,15 +141,15 @@ public class PigletMetadataProvider implements IMetadataProvider<String, String>
@Override
public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc)
- throws AlgebricksException {
+ throws AlgebricksException {
PigletFileDataSink ds = (PigletFileDataSink) sink;
FileSplit[] fileSplits = ds.getFileSplits();
String[] locations = new String[fileSplits.length];
for (int i = 0; i < fileSplits.length; ++i) {
locations[i] = fileSplits[i].getNodeName();
}
- IPushRuntimeFactory prf = new SinkWriterRuntimeFactory(printColumns, printerFactories, fileSplits[0]
- .getLocalFile().getFile(), PrinterBasedWriterFactory.INSTANCE, inputDesc);
+ IPushRuntimeFactory prf = new SinkWriterRuntimeFactory(printColumns, printerFactories,
+ fileSplits[0].getLocalFile().getFile(), PrinterBasedWriterFactory.INSTANCE, inputDesc);
AlgebricksAbsolutePartitionConstraint constraint = new AlgebricksAbsolutePartitionConstraint(locations);
return new Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint>(prf, constraint);
}
@@ -178,15 +178,11 @@ public class PigletMetadataProvider implements IMetadataProvider<String, String>
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
- IDataSourceIndex<String, String> dataSource,
- IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
- IVariableTypeEnvironment typeEnv,
- List<LogicalVariable> primaryKeys,
- List<LogicalVariable> secondaryKeys,
- List<LogicalVariable> additionalNonKeyFields,
- ILogicalExpression filterExpr,
- RecordDescriptor recordDesc, JobGenContext context,
- JobSpecification spec, boolean bulkload) throws AlgebricksException {
+ IDataSourceIndex<String, String> dataSource, IOperatorSchema propagatedSchema,
+ IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
+ List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
+ ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec,
+ boolean bulkload) throws AlgebricksException {
// TODO Auto-generated method stub
return null;
}
@@ -197,16 +193,16 @@ public class PigletMetadataProvider implements IMetadataProvider<String, String>
IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
- throws AlgebricksException {
+ throws AlgebricksException {
// TODO Auto-generated method stub
return null;
}
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getTokenizerRuntime(
- IDataSourceIndex<String, String> dataSource, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
- IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
- ILogicalExpression filterExpr, RecordDescriptor recordDesc,
+ IDataSourceIndex<String, String> dataSource, IOperatorSchema propagatedSchema,
+ IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
+ List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
JobGenContext context, JobSpecification spec, boolean bulkload) throws AlgebricksException {
// TODO Auto-generated method stub
return null;
@@ -235,4 +231,24 @@ public class PigletMetadataProvider implements IMetadataProvider<String, String>
return null;
}
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime(IDataSource<String> dataSource,
+ IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
+ LogicalVariable payLoadVar, List<LogicalVariable> additionalNonKeyFields, LogicalVariable prevPayload,
+ RecordDescriptor recordDesc, JobGenContext context, JobSpecification jobSpec) throws AlgebricksException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexUpsertRuntime(
+ IDataSourceIndex<String, String> dataSourceIndex, IOperatorSchema propagatedSchema,
+ IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
+ List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalFilteringKeys,
+ ILogicalExpression filterExpr, List<LogicalVariable> prevSecondaryKeys,
+ LogicalVariable prevAdditionalFilteringKeys, RecordDescriptor inputDesc, JobGenContext context,
+ JobSpecification spec) throws AlgebricksException {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 394524b..d85ffe9 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -23,7 +23,6 @@ import java.util.List;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
-
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -45,9 +44,10 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOpe
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
@@ -62,8 +62,8 @@ import org.apache.hyracks.algebricks.core.algebra.operators.physical.EmptyTupleS
import org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.InMemoryStableSortPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexBulkloadPOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexInsertDeletePOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.InsertDeletePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexInsertDeleteUpsertPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.InsertDeleteUpsertPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroPreclusteredGroupByPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedTupleSourcePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreSortedDistinctByPOperator;
@@ -95,7 +95,8 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule
}
@Override
- public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
// if (context.checkIfInDontApplySet(this, op)) {
// return false;
@@ -116,7 +117,7 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule
}
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({ "unchecked", "rawtypes" })
private static void computeDefaultPhysicalOp(AbstractLogicalOperator op, boolean topLevelOp,
IOptimizationContext context) throws AlgebricksException {
PhysicalOptimizationConfig physicalOptimizationConfig = context.getPhysicalOptimizationConfig();
@@ -152,8 +153,8 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule
if (gby.getNestedPlans().size() == 1) {
ILogicalPlan p0 = gby.getNestedPlans().get(0);
if (p0.getRoots().size() == 1) {
- if (gby.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY) == Boolean.TRUE
- || gby.getAnnotations().get(OperatorAnnotations.USE_EXTERNAL_GROUP_BY) == Boolean.TRUE) {
+ if (gby.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY) == Boolean.TRUE || gby
+ .getAnnotations().get(OperatorAnnotations.USE_EXTERNAL_GROUP_BY) == Boolean.TRUE) {
if (!topLevelOp) {
throw new NotImplementedException(
"External hash group-by for nested grouping is not implemented.");
@@ -213,8 +214,8 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule
}
}
if (topLevelOp) {
- op.setPhysicalOperator(new StableSortPOperator(physicalOptimizationConfig
- .getMaxFramesExternalSort()));
+ op.setPhysicalOperator(
+ new StableSortPOperator(physicalOptimizationConfig.getMaxFramesExternalSort()));
} else {
op.setPhysicalOperator(new InMemoryStableSortPOperator());
}
@@ -282,12 +283,13 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule
additionalFilteringKeys = new ArrayList<LogicalVariable>();
getKeys(opLoad.getAdditionalFilteringExpressions(), additionalFilteringKeys);
}
- op.setPhysicalOperator(new WriteResultPOperator(opLoad.getDataSource(), payload, keys,
- additionalFilteringKeys));
+ op.setPhysicalOperator(
+ new WriteResultPOperator(opLoad.getDataSource(), payload, keys, additionalFilteringKeys));
break;
}
- case INSERT_DELETE: {
- InsertDeleteOperator opLoad = (InsertDeleteOperator) op;
+ case INSERT_DELETE_UPSERT: {
+ // Primary index
+ InsertDeleteUpsertOperator opLoad = (InsertDeleteUpsertOperator) op;
LogicalVariable payload;
List<LogicalVariable> keys = new ArrayList<LogicalVariable>();
List<LogicalVariable> additionalFilteringKeys = null;
@@ -297,16 +299,17 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule
getKeys(opLoad.getAdditionalFilteringExpressions(), additionalFilteringKeys);
}
if (opLoad.isBulkload()) {
- op.setPhysicalOperator(new BulkloadPOperator(payload, keys, additionalFilteringKeys, opLoad
- .getDataSource()));
+ op.setPhysicalOperator(
+ new BulkloadPOperator(payload, keys, additionalFilteringKeys, opLoad.getDataSource()));
} else {
- op.setPhysicalOperator(new InsertDeletePOperator(payload, keys, additionalFilteringKeys, opLoad
- .getDataSource()));
+ op.setPhysicalOperator(new InsertDeleteUpsertPOperator(payload, keys, additionalFilteringKeys,
+ opLoad.getDataSource(), opLoad.getOperation(), opLoad.getPrevRecordVar()));
}
break;
}
- case INDEX_INSERT_DELETE: {
- IndexInsertDeleteOperator opInsDel = (IndexInsertDeleteOperator) op;
+ case INDEX_INSERT_DELETE_UPSERT: {
+ // Secondary index
+ IndexInsertDeleteUpsertOperator opInsDel = (IndexInsertDeleteUpsertOperator) op;
List<LogicalVariable> primaryKeys = new ArrayList<LogicalVariable>();
List<LogicalVariable> secondaryKeys = new ArrayList<LogicalVariable>();
List<LogicalVariable> additionalFilteringKeys = null;
@@ -317,13 +320,24 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule
getKeys(opInsDel.getAdditionalFilteringExpressions(), additionalFilteringKeys);
}
if (opInsDel.isBulkload()) {
- op.setPhysicalOperator(new IndexBulkloadPOperator(primaryKeys, secondaryKeys,
- additionalFilteringKeys, opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex()));
+ op.setPhysicalOperator(
+ new IndexBulkloadPOperator(primaryKeys, secondaryKeys, additionalFilteringKeys,
+ opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex()));
} else {
- op.setPhysicalOperator(new IndexInsertDeletePOperator(primaryKeys, secondaryKeys,
- additionalFilteringKeys, opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex()));
+ List<LogicalVariable> prevSecondaryKeys = null;
+ LogicalVariable prevAdditionalFilteringKey = null;
+ if (opInsDel.getOperation() == Kind.UPSERT) {
+ prevSecondaryKeys = new ArrayList<LogicalVariable>();
+ getKeys(opInsDel.getPrevSecondaryKeyExprs(), prevSecondaryKeys);
+ if (opInsDel.getPrevAdditionalFilteringExpression() != null) {
+ prevAdditionalFilteringKey = ((VariableReferenceExpression) (opInsDel
+ .getPrevAdditionalFilteringExpression()).getValue()).getVariableReference();
+ }
+ }
+ op.setPhysicalOperator(new IndexInsertDeleteUpsertPOperator(primaryKeys, secondaryKeys,
+ additionalFilteringKeys, opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex(),
+ prevSecondaryKeys, prevAdditionalFilteringKey));
}
-
break;
}
@@ -335,8 +349,8 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule
getKeys(opTokenize.getSecondaryKeyExpressions(), secondaryKeys);
// Tokenize Operator only operates with a bulk load on a data set with an index
if (opTokenize.isBulkload()) {
- op.setPhysicalOperator(new TokenizePOperator(primaryKeys, secondaryKeys, opTokenize
- .getDataSourceIndex()));
+ op.setPhysicalOperator(
+ new TokenizePOperator(primaryKeys, secondaryKeys, opTokenize.getDataSourceIndex()));
}
break;
}
@@ -415,8 +429,8 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule
int n = aggOp.getExpressions().size();
List<Mutable<ILogicalExpression>> mergeExpressionRefs = new ArrayList<Mutable<ILogicalExpression>>();
for (int i = 0; i < n; i++) {
- ILogicalExpression mergeExpr = mergeAggregationExpressionFactory.createMergeAggregation(
- originalAggVars.get(i), aggFuncRefs.get(i).getValue(), context);
+ ILogicalExpression mergeExpr = mergeAggregationExpressionFactory
+ .createMergeAggregation(originalAggVars.get(i), aggFuncRefs.get(i).getValue(), context);
if (mergeExpr == null) {
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
index a89e895..70d4f80 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
@@ -20,6 +20,7 @@ package org.apache.hyracks.dataflow.common.comm.io;
import java.io.DataInputStream;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import org.apache.hyracks.api.comm.FrameConstants;
import org.apache.hyracks.api.comm.FrameHelper;
@@ -72,9 +73,8 @@ public class FrameTupleAccessor implements IFrameTupleAccessor {
@Override
public int getTupleStartOffset(int tupleIndex) {
- int offset = tupleIndex == 0 ?
- FrameConstants.TUPLE_START_OFFSET :
- IntSerDeUtils.getInt(buffer.array(), tupleCountOffset - 4 * tupleIndex);
+ int offset = tupleIndex == 0 ? FrameConstants.TUPLE_START_OFFSET
+ : IntSerDeUtils.getInt(buffer.array(), tupleCountOffset - 4 * tupleIndex);
return start + offset;
}
@@ -85,16 +85,15 @@ public class FrameTupleAccessor implements IFrameTupleAccessor {
@Override
public int getTupleEndOffset(int tupleIndex) {
- return start + IntSerDeUtils
- .getInt(buffer.array(), tupleCountOffset - FrameConstants.SIZE_LEN * (tupleIndex + 1));
+ return start
+ + IntSerDeUtils.getInt(buffer.array(), tupleCountOffset - FrameConstants.SIZE_LEN * (tupleIndex + 1));
}
@Override
public int getFieldStartOffset(int tupleIndex, int fIdx) {
- return fIdx == 0 ?
- 0 :
- IntSerDeUtils
- .getInt(buffer.array(), getTupleStartOffset(tupleIndex) + (fIdx - 1) * FrameConstants.SIZE_LEN);
+ return fIdx == 0 ? 0
+ : IntSerDeUtils.getInt(buffer.array(),
+ getTupleStartOffset(tupleIndex) + (fIdx - 1) * FrameConstants.SIZE_LEN);
}
@Override
@@ -161,4 +160,44 @@ public class FrameTupleAccessor implements IFrameTupleAccessor {
public int getFieldCount() {
return recordDescriptor.getFieldCount();
}
+
+ /*
+ * The two methods below can be used for debugging.
+ * They are safe as they don't print records. Printing records
+ * using IserializerDeserializer can print incorrect results or throw exceptions.
+ * A better way yet would be to use record pointable.
+ */
+ public void prettyPrint(String prefix, int[] recordFields) {
+ ByteBufferInputStream bbis = new ByteBufferInputStream();
+ DataInputStream dis = new DataInputStream(bbis);
+ int tc = getTupleCount();
+ StringBuilder sb = new StringBuilder();
+ sb.append(prefix).append("TC: " + tc).append("\n");
+ for (int i = 0; i < tc; ++i) {
+ prettyPrint(i, bbis, dis, sb, recordFields);
+ }
+ System.err.println(sb.toString());
+ }
+
+ protected void prettyPrint(int tid, ByteBufferInputStream bbis, DataInputStream dis, StringBuilder sb,
+ int[] recordFields) {
+ Arrays.sort(recordFields);
+ sb.append(" tid" + tid + ":(" + getTupleStartOffset(tid) + ", " + getTupleEndOffset(tid) + ")[");
+ for (int j = 0; j < getFieldCount(); ++j) {
+ sb.append("f" + j + ":(" + getFieldStartOffset(tid, j) + ", " + getFieldEndOffset(tid, j) + ") ");
+ sb.append("{");
+ bbis.setByteBuffer(buffer, getTupleStartOffset(tid) + getFieldSlotsLength() + getFieldStartOffset(tid, j));
+ try {
+ if (Arrays.binarySearch(recordFields, j) >= 0) {
+ sb.append("a record field: only print using pointable");
+ } else {
+ sb.append(recordDescriptor.getFields()[j].deserialize(dis));
+ }
+ } catch (HyracksDataException e) {
+ e.printStackTrace();
+ }
+ sb.append("}");
+ }
+ sb.append("\n");
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/FrameTupleReference.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/FrameTupleReference.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/FrameTupleReference.java
index abf9c37..15ebe52 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/FrameTupleReference.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/FrameTupleReference.java
@@ -21,9 +21,9 @@ package org.apache.hyracks.dataflow.common.data.accessors;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-public final class FrameTupleReference implements IFrameTupleReference {
- private IFrameTupleAccessor fta;
- private int tIndex;
+public class FrameTupleReference implements IFrameTupleReference {
+ protected IFrameTupleAccessor fta;
+ protected int tIndex;
public void reset(IFrameTupleAccessor fta, int tIndex) {
this.fta = fta;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/RangePredicate.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/RangePredicate.java b/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/RangePredicate.java
index 0399ede..f1a411b 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/RangePredicate.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/RangePredicate.java
@@ -70,10 +70,12 @@ public class RangePredicate extends AbstractSearchPredicate {
this.highKeyCmp = highKeyCmp;
}
+ @Override
public MultiComparator getLowKeyComparator() {
return lowKeyCmp;
}
+ @Override
public MultiComparator getHighKeyComparator() {
return highKeyCmp;
}
@@ -86,6 +88,7 @@ public class RangePredicate extends AbstractSearchPredicate {
this.highKeyCmp = highKeyCmp;
}
+ @Override
public ITupleReference getLowKey() {
return lowKey;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java
index 5aac5a6..7ec4a30 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java
@@ -22,33 +22,42 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
/**
- * This operation callback allows for arbitrary actions to be taken while traversing
- * an index structure. The {@link IModificationOperationCallback} will be called on
+ * This operation callback allows for arbitrary actions to be taken while traversing
+ * an index structure. The {@link IModificationOperationCallback} will be called on
* all modifying operations (e.g. insert, update, delete...) for all indexes.
- *
* @author zheilbron
*/
public interface IModificationOperationCallback {
+ public enum Operation {
+ INSERT,
+ DELETE
+ }
/**
- * This method is called on a tuple that is about to traverse an index's structure
- * (i.e. before any pages are pinned or latched).
- *
+ * This method is called on a tuple that is about to traverse an index's structure
+ * (i.e. before any pages are pinned or latched).
* The format of the tuple is the format that would be stored in the index itself.
- *
- * @param tuple the tuple that is about to be operated on
+ * @param tuple
+ * the tuple that is about to be operated on
*/
public void before(ITupleReference tuple) throws HyracksDataException;
/**
- * This method is called on a tuple when a tuple with a matching key is found for the
- * current operation. It is possible that tuple is null, in which case no tuple with a
+ * This method is called on a tuple when a tuple with a matching key is found for the
+ * current operation. It is possible that tuple is null, in which case no tuple with a
* matching key was found.
- *
- * When found is called, the leaf page where the tuple resides will be pinned and latched,
+ * When found is called, the leaf page where the tuple resides will be pinned and latched,
* so blocking operations should be avoided.
- *
- * @param tuple a tuple with a matching key, otherwise null if none exists
+ * @param tuple
+ * a tuple with a matching key, otherwise null if none exists
*/
public void found(ITupleReference before, ITupleReference after) throws HyracksDataException;
+
+ /**
+ * This call specifies the next operation to be performed. It is used to allow
+ * a single operator to perform different operations per tuple
+ * @param op
+ * @throws HyracksDataException
+ */
+ public void setOp(Operation op) throws HyracksDataException;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchOperationCallback.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchOperationCallback.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchOperationCallback.java
index 08ad1d0..9b1cd47 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchOperationCallback.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchOperationCallback.java
@@ -22,50 +22,56 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
/**
- * This operation callback allows for arbitrary actions to be taken while traversing
- * an index structure. The {@link ISearchOperationCallback} will be called on
+ * This operation callback allows for arbitrary actions to be taken while traversing
+ * an index structure. The {@link ISearchOperationCallback} will be called on
* all search operations for ordered indexes only.
- *
* @author zheilbron
*/
public interface ISearchOperationCallback {
/**
- * During an index search operation, this method will be called on tuples as they are
- * passed by with a search cursor. This call will be invoked while a leaf page is latched
- * and pinned. If the call returns false, then the page will be unlatched and unpinned and
- * {@link #reconcile(ITupleReference)} will be called with the tuple that was not proceeded
+ * After the harness enters the operation components and before an index search operation starts,
+ * this method will be called on the search key.
+ * @param tuple
+ * the tuple containing the search key (expected to be a point search key)
+ */
+ public void before(ITupleReference tuple) throws HyracksDataException;
+
+ /**
+ * During an index search operation, this method will be called on tuples as they are
+ * passed by with a search cursor. This call will be invoked while a leaf page is latched
+ * and pinned. If the call returns false, then the page will be unlatched and unpinned and
+ * {@link #reconcile(ITupleReference)} will be called with the tuple that was not proceeded
* on.
- *
- * @param tuple the tuple that is being passed over by the search cursor
+ * @param tuple
+ * the tuple that is being passed over by the search cursor
* @return true to proceed otherwise false to unlatch and unpin, leading to reconciliation
*/
public boolean proceed(ITupleReference tuple) throws HyracksDataException;
/**
- * This method is only called on a tuple that was not 'proceeded' on
- * (see {@link #proceed(ITupleReference)}). This method allows an opportunity to reconcile
- * by performing any necessary actions before resuming the search (e.g. a try-lock may have
+ * This method is only called on a tuple that was not 'proceeded' on
+ * (see {@link #proceed(ITupleReference)}). This method allows an opportunity to reconcile
+ * by performing any necessary actions before resuming the search (e.g. a try-lock may have
* failed in the proceed call, and now in reconcile we should take a full (blocking) lock).
- *
- * @param tuple the tuple that failed to proceed
+ * @param tuple
+ * the tuple that failed to proceed
*/
public void reconcile(ITupleReference tuple) throws HyracksDataException;
/**
- * This method is only called on a tuple that was reconciled on, but not found after
- * retraversing. This method allows an opportunity to cancel some action that was taken in
+ * This method is only called on a tuple that was reconciled on, but not found after
+ * retraversing. This method allows an opportunity to cancel some action that was taken in
* {@link #reconcile(ITupleReference))}.
- *
- * @param tuple the tuple that was previously reconciled
+ * @param tuple
+ * the tuple that was previously reconciled
*/
public void cancel(ITupleReference tuple) throws HyracksDataException;
-
+
/**
* This method is only called on a tuple that was reconciled on, and found after
- * retraversing. This method allows an opportunity to do some subsequent action that was
+ * retraversing. This method allows an opportunity to do some subsequent action that was
* taken in {@link #reconcile(ITupleReference))}.
- *
* @param tuple
* the tuple that was previously reconciled
*/
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchPredicate.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchPredicate.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchPredicate.java
index 94a6a27..1c0d143 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchPredicate.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchPredicate.java
@@ -21,10 +21,20 @@ package org.apache.hyracks.storage.am.common.api;
import java.io.Serializable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
public interface ISearchPredicate extends Serializable {
- public MultiComparator getLowKeyComparator();
+ public MultiComparator getLowKeyComparator();
- public MultiComparator getHighKeyComparator();
+ public MultiComparator getHighKeyComparator();
+
+ /**
+ * Get the search key to be used with point search operation on primary index.
+ * This method will only be called with point search predicates that only happen in primary index.
+ * @return
+ * @throws HyracksDataException
+ */
+ public ITupleReference getLowKey();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java
index b632ba9..e8ab8dc 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java
@@ -27,7 +27,7 @@ import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
/**
* Dummy operation callback that simply does nothing.
*/
-public enum NoOpOperationCallback implements IModificationOperationCallback, ISearchOperationCallback {
+public enum NoOpOperationCallback implements IModificationOperationCallback,ISearchOperationCallback {
INSTANCE;
@Override
@@ -42,12 +42,12 @@ public enum NoOpOperationCallback implements IModificationOperationCallback, ISe
@Override
public void before(ITupleReference tuple) {
- // Do nothing.
+ // Do nothing.
}
@Override
public void found(ITupleReference before, ITupleReference after) {
- // Do nothing.
+ // Do nothing.
}
@Override
@@ -59,4 +59,9 @@ public enum NoOpOperationCallback implements IModificationOperationCallback, ISe
public void complete(ITupleReference tuple) throws HyracksDataException {
// Do nothing.
}
+
+ @Override
+ public void setOp(Operation op) throws HyracksDataException {
+ // Do nothing.
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/PermutingFrameTupleReference.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/PermutingFrameTupleReference.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/PermutingFrameTupleReference.java
index 5f27835..b9792c2 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/PermutingFrameTupleReference.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/PermutingFrameTupleReference.java
@@ -19,51 +19,40 @@
package org.apache.hyracks.storage.am.common.tuples;
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-
-public class PermutingFrameTupleReference implements IFrameTupleReference {
- private IFrameTupleAccessor fta;
- private int tIndex;
- private int[] fieldPermutation;
-
- public void setFieldPermutation(int[] fieldPermutation) {
- this.fieldPermutation = fieldPermutation;
- }
-
- public void reset(IFrameTupleAccessor fta, int tIndex) {
- this.fta = fta;
- this.tIndex = tIndex;
- }
-
- @Override
- public IFrameTupleAccessor getFrameTupleAccessor() {
- return fta;
- }
-
- @Override
- public int getTupleIndex() {
- return tIndex;
- }
-
- @Override
- public int getFieldCount() {
- return fieldPermutation.length;
- }
-
- @Override
- public byte[] getFieldData(int fIdx) {
- return fta.getBuffer().array();
- }
-
- @Override
- public int getFieldStart(int fIdx) {
- return fta.getTupleStartOffset(tIndex) + fta.getFieldSlotsLength()
- + fta.getFieldStartOffset(tIndex, fieldPermutation[fIdx]);
- }
-
- @Override
- public int getFieldLength(int fIdx) {
- return fta.getFieldLength(tIndex, fieldPermutation[fIdx]);
- }
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+
+public class PermutingFrameTupleReference extends FrameTupleReference {
+ private int[] fieldPermutation;
+
+ public PermutingFrameTupleReference(int[] fieldPermutation) {
+ this.fieldPermutation = fieldPermutation;
+ }
+
+ public PermutingFrameTupleReference() {
+ }
+
+ public void setFieldPermutation(int[] fieldPermutation) {
+ this.fieldPermutation = fieldPermutation;
+ }
+
+ @Override
+ public int getFieldCount() {
+ return fieldPermutation.length;
+ }
+
+ @Override
+ public byte[] getFieldData(int fIdx) {
+ return fta.getBuffer().array();
+ }
+
+ @Override
+ public int getFieldStart(int fIdx) {
+ return fta.getTupleStartOffset(tIndex) + fta.getFieldSlotsLength()
+ + fta.getFieldStartOffset(tIndex, fieldPermutation[fIdx]);
+ }
+
+ @Override
+ public int getFieldLength(int fIdx) {
+ return fta.getFieldLength(tIndex, fieldPermutation[fIdx]);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index ae3aa52..13c6949 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -271,13 +271,13 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex {
operationalComponents.clear();
switch (ctx.getOperation()) {
case UPDATE:
- case UPSERT:
case PHYSICALDELETE:
case FLUSH:
case DELETE:
operationalComponents.add(memoryComponents.get(cmc));
break;
case INSERT:
+ case UPSERT:
addOperationalMutableComponents(operationalComponents);
operationalComponents.addAll(immutableComponents);
break;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
index 08891c2..fa25524 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
@@ -54,8 +54,8 @@ public final class LSMBTreeOpContext implements ILSMIndexOperationContext {
public IndexOperation op;
public final MultiComparator cmp;
public final MultiComparator bloomFilterCmp;
- public final IModificationOperationCallback modificationCallback;
- public final ISearchOperationCallback searchCallback;
+ public IModificationOperationCallback modificationCallback;
+ public ISearchOperationCallback searchCallback;
private final List<ILSMComponent> componentHolder;
private final List<ILSMComponent> componentsToBeMerged;
private final List<ILSMComponent> componentsToBeReplicated;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index 63e651a..4c4ed28 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -28,8 +28,8 @@ import org.apache.hyracks.storage.am.common.api.IndexException;
public interface ILSMHarness {
- public void forceModify(ILSMIndexOperationContext ctx, ITupleReference tuple) throws HyracksDataException,
- IndexException;
+ public void forceModify(ILSMIndexOperationContext ctx, ITupleReference tuple)
+ throws HyracksDataException, IndexException;
public boolean modify(ILSMIndexOperationContext ctx, boolean tryOperation, ITupleReference tuple)
throws HyracksDataException, IndexException;
@@ -45,14 +45,14 @@ public interface ILSMHarness {
public void scheduleFullMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException, IndexException;
- public void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
- IndexException;
+ public void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation)
+ throws HyracksDataException, IndexException;
public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException;
- public void flush(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
- IndexException;
+ public void flush(ILSMIndexOperationContext ctx, ILSMIOOperation operation)
+ throws HyracksDataException, IndexException;
public void addBulkLoadedComponent(ILSMComponent index) throws HyracksDataException, IndexException;
@@ -62,5 +62,4 @@ public interface ILSMHarness {
LSMOperationType opType) throws HyracksDataException;
public void endReplication(ILSMIndexOperationContext ctx) throws HyracksDataException;
-
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
index 08bcfee..99f981d 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
@@ -39,6 +39,6 @@ public interface ILSMIndexOperationContext extends IIndexOperationContext {
public void setSearchPredicate(ISearchPredicate searchPredicate);
public ISearchPredicate getSearchPredicate();
-
+
public List<ILSMComponent> getComponentsToBeReplicated();
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java
index b172864..d99c9e8 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java
@@ -22,17 +22,19 @@ package org.apache.hyracks.storage.am.lsm.common.dataflow;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.INullWriterFactory;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
import org.apache.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.common.IStorageManagerInterface;
import org.apache.hyracks.storage.common.file.NoOpLocalResourceFactoryProvider;
@@ -49,18 +51,20 @@ public class LSMTreeIndexInsertUpdateDeleteOperatorDescriptor extends AbstractTr
IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories, int[] bloomFilterKeyFields, int[] fieldPermutation,
IndexOperation op, IIndexDataflowHelperFactory dataflowHelperFactory,
- ITupleFilterFactory tupleFilterFactory, IModificationOperationCallbackFactory modificationOpCallbackProvider) {
+ ITupleFilterFactory tupleFilterFactory, INullWriterFactory nullWriterFactory,
+ IModificationOperationCallbackFactory modificationOpCallbackProvider,
+ ISearchOperationCallbackFactory searchOpCallbackProvider) {
super(spec, 1, 1, recDesc, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits,
- comparatorFactories, bloomFilterKeyFields, dataflowHelperFactory, tupleFilterFactory, false,
- false, null,
- NoOpLocalResourceFactoryProvider.INSTANCE, NoOpOperationCallbackFactory.INSTANCE, modificationOpCallbackProvider);
+ comparatorFactories, bloomFilterKeyFields, dataflowHelperFactory, tupleFilterFactory, false, false,
+ nullWriterFactory, NoOpLocalResourceFactoryProvider.INSTANCE, searchOpCallbackProvider,
+ modificationOpCallbackProvider);
this.fieldPermutation = fieldPermutation;
this.op = op;
}
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
return new LSMIndexInsertUpdateDeleteOperatorNodePushable(this, ctx, partition, fieldPermutation,
recordDescProvider, op);
}