You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by ht...@apache.org on 2019/04/30 05:03:12 UTC

[asterixdb] branch master updated: [NO ISSUE][FUN] Introduce data generator function

This is an automated email from the ASF dual-hosted git repository.

htowaileb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 76e3cec  [NO ISSUE][FUN] Introduce data generator function
76e3cec is described below

commit 76e3cec95a6bf24d9a65e2634e608452d5efdb00
Author: Hussain Towaileb <Hu...@Gmail.com>
AuthorDate: Mon Apr 29 22:14:37 2019 +0300

    [NO ISSUE][FUN] Introduce data generator function
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    - Introduced data generator function to generate data with accordance
    to the TPC Benchmark DS specifications.
    - Data generator function accepts 2 arguments, table name and scaling
    factor, this makes the same function reusable for all the desired
    tables and scaling factors to be generated.
    - Added test cases for the data generator functions.
    
    Change-Id: Idb6bd6f697628395c70008e6f730bc5ca403da5e
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/3360
    Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Till Westmann <ti...@apache.org>
---
 asterixdb/asterix-app/pom.xml                      |  23 +++
 .../app/function/TPCDSDataGeneratorDatasource.java |  63 ++++++++
 .../app/function/TPCDSDataGeneratorFunction.java   |  56 +++++++
 .../app/function/TPCDSDataGeneratorReader.java     | 163 +++++++++++++++++++++
 .../app/function/TPCDSDataGeneratorRewriter.java   | 121 +++++++++++++++
 .../asterix/util/MetadataBuiltinFunctions.java     |   7 +
 .../datagen_sf_1_all_tables.3.query.sqlpp          |  47 ++++++
 .../datagen_sf_1_invalid_table.3.query.sqlpp       |  24 +++
 .../datagen_sf_1_small_tables.3.query.sqlpp        |  36 +++++
 .../datagen_sf_1_all_tables.1.adm                  |   1 +
 .../datagen_sf_1_invalid_table.1.adm               |   0
 .../datagen_sf_1_small_tables.1.adm                |   1 +
 .../test/resources/runtimets/testsuite_sqlpp.xml   |  12 ++
 .../asterix/common/exceptions/ErrorCode.java       |   1 +
 .../src/main/resources/asx_errormsg/en.properties  |   1 +
 15 files changed, 556 insertions(+)

diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index c919856..e25c45d 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -676,5 +676,28 @@
       <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-storage-am-lsm-invertedindex</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.teradata.tpcds</groupId>
+      <artifactId>tpcds</artifactId>
+      <version>1.2</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.testng</groupId>
+          <artifactId>testng</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.code.findbugs</groupId>
+          <artifactId>annotations</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.airlift</groupId>
+          <artifactId>airline</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>javax.inject</groupId>
+          <artifactId>javax.inject</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
   </dependencies>
 </project>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorDatasource.java
new file mode 100644
index 0000000..d0ff67e
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorDatasource.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.metadata.api.IDatasourceFunction;
+import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+
+/**
+ * This TPC-DS function is used to generate data with accordance to the specifications of the TPC Benchmark DS.
+ */
+public class TPCDSDataGeneratorDatasource extends FunctionDataSource {
+
+    private final String tableName;
+    private final double scalingFactor;
+
+    public TPCDSDataGeneratorDatasource(INodeDomain domain, String tableName, double scalingFactor)
+            throws AlgebricksException {
+        super(createDataSourceId(tableName), domain);
+        this.tableName = tableName;
+        this.scalingFactor = scalingFactor;
+    }
+
+    /**
+     * This ensures that each function will have a unique DataSourceId by passing the table name as part of the
+     * DataSourceId. This eliminates the issue of creating a single function even though multiple functions calls
+     * are happening with different parameters and the optimizer understands them as a single function.
+     *
+     * @param tableName table name to be added as part of the DataSourceId
+     *
+     * @return A DataSourceId that's based on the function details and its parameters
+     */
+    private static DataSourceId createDataSourceId(String tableName) {
+        return new DataSourceId(TPCDSDataGeneratorRewriter.TPCDS_DATA_GENERATOR.getNamespace(),
+                TPCDSDataGeneratorRewriter.TPCDS_DATA_GENERATOR.getName() + "." + tableName);
+    }
+
+    @Override
+    protected IDatasourceFunction createFunction(MetadataProvider metadataProvider,
+            AlgebricksAbsolutePartitionConstraint locations) {
+        return new TPCDSDataGeneratorFunction(locations, tableName, scalingFactor);
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorFunction.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorFunction.java
new file mode 100644
index 0000000..4c52bca
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.metadata.declared.AbstractDatasourceFunction;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * This TPC-DS function is used to generate data with accordance to the specifications of the TPC Benchmark DS.
+ */
+public class TPCDSDataGeneratorFunction extends AbstractDatasourceFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String tableName;
+    private final double scalingFactor;
+    private final int parallelism;
+
+    public TPCDSDataGeneratorFunction(AlgebricksAbsolutePartitionConstraint locations, String tableName,
+            double scalingFactor) {
+        super(locations);
+        this.tableName = tableName;
+        this.scalingFactor = scalingFactor;
+
+        /*
+        TPC-DS has the option to parallelize the data generation and produce the data as chunks. We'll match the
+        parallelism with the number of partitions we have, and assign each partition to take care of a certain chunk
+         */
+        this.parallelism = locations.getLocations().length;
+    }
+
+    @Override
+    public IRecordReader<char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
+            throws HyracksDataException {
+        return new TPCDSDataGeneratorReader(tableName, scalingFactor, parallelism, partition);
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorReader.java
new file mode 100644
index 0000000..626dee3
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorReader.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.input.record.CharArrayRecord;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+import com.teradata.tpcds.Results;
+import com.teradata.tpcds.Session;
+import com.teradata.tpcds.Table;
+
+/**
+ * Each partition will be running a TPCDSDataGeneratorReader instance. Depending on the number of partitions, the data
+ * generator will parallelize its work based on the number of partitions. The reader is passed the parallelism level
+ * (depending on the number of partitions).
+ *
+ * Note: The data generator does not apply the parallelism unless at least 1,000,000 rows generation is requested (this
+ * depends on the scaling factor and the table for whose the rows are being generated). This means, despite the number
+ * of available partitions, if the minimum number of rows is not met, a single partition will generate all the rows
+ * while all the other partitions will generate 0 rows.
+ */
+public class TPCDSDataGeneratorReader extends FunctionReader {
+
+    private final int parallelism;
+    private final int chunkNumber;
+    private final String tableName;
+    private final double scalingFactor;
+    private Table selectedTable;
+    private final StringBuilder builder = new StringBuilder();
+    private final Iterator<List<List<String>>> dataGeneratorIterator;
+
+    public TPCDSDataGeneratorReader(String tableName, double scalingFactor, int parallelism, int partitionNumber)
+            throws HyracksDataException {
+        this.tableName = tableName;
+        this.scalingFactor = scalingFactor;
+        this.parallelism = parallelism;
+
+        /*
+         Since we already ensured the parallelism level for the TPC-DS matches the number of partitions we have, we
+         need a way to tell each partition which chunk to generate. Since each TPCDSDataGeneratorReader is receiving
+         the partition number that's running it, we're gonna use that as the chunk to be produced by the data
+         generator.
+         Note that the indexing for the partitions starts at position 0, but the data generator chunks start at 1,
+         so the chunk will always be the partitionNumber + 1
+         */
+        chunkNumber = partitionNumber + 1;
+
+        dataGeneratorIterator = getDataGeneratorIterator();
+    }
+
+    @Override
+    public boolean hasNext() {
+        return dataGeneratorIterator.hasNext();
+    }
+
+    @Override
+    public IRawRecord<char[]> next() throws IOException, InterruptedException {
+        CharArrayRecord record = new CharArrayRecord();
+        record.append((formatRecord(dataGeneratorIterator.next())).toCharArray());
+        record.endRecord();
+        return record;
+    }
+
+    /**
+     * Create the data generator iterator with the specified properties passed to the session.
+     *
+     * @return A lazy iterator to generate the data based on the specified properties.
+     */
+    private Iterator<List<List<String>>> getDataGeneratorIterator() throws HyracksDataException {
+        selectedTable = getTableFromStringTableName(tableName);
+
+        // Create the session with the specified properties, the sessions also specifies the chunk to be generated
+        Session session = Session.getDefaultSession().withTable(selectedTable).withScale(scalingFactor)
+                .withParallelism(parallelism).withChunkNumber(chunkNumber);
+
+        // Construct the Results and Results iterator
+        Results results = Results.constructResults(selectedTable, session);
+        return results.iterator();
+    }
+
+    /**
+     * Gets the table matching the provided string table name, throws an exception if no table is returned.
+     *
+     * @param tableName String table name to search for.
+     * @return Table if found, throws an exception otherwise.
+     */
+    private Table getTableFromStringTableName(String tableName) throws HyracksDataException {
+
+        List<Table> matchedTables = Table.getBaseTables().stream()
+                .filter(table -> tableName.equalsIgnoreCase(table.getName())).collect(Collectors.toList());
+
+        // Ensure the table was found
+        if (matchedTables.size() != 1) {
+            throw new RuntimeDataException(ErrorCode.TPCDS_INVALID_TABLE_NAME, getIdentifier().getName(), tableName);
+        }
+
+        return matchedTables.get(0);
+    }
+
+    /**
+     * Builds the string record from the generated values by the data generator. The column name for each value is
+     * extracted from the table from which the data is being generated.
+     *
+     * @param values List containing all the generated column values
+     *
+     * @return The built string record from the generated values
+     */
+    private String formatRecord(List<List<String>> values) {
+        // Clear the builder (This is faster than re-creating the builder each iteration)
+        builder.setLength(0);
+
+        int counter;
+        builder.append("{");
+
+        // We loop only to the item before the last, then add the last item manually to avoid appending the ","
+        // at the end, this way we avoid constantly checking if we're at the last item or substring the whole record
+        for (counter = 0; counter < values.get(0).size() - 1; counter++) {
+            builder.append("\"");
+            builder.append(selectedTable.getColumns()[counter].getName());
+            builder.append("\":\"");
+            builder.append(values.get(0).get(counter));
+            builder.append("\",");
+        }
+
+        // This is the last item to be appended, don't append the "," after appending the field
+        builder.append("\"");
+        builder.append(selectedTable.getColumns()[counter].getName());
+        builder.append("\":\"");
+        builder.append(values.get(0).get(counter));
+        builder.append("\"}");
+
+        return builder.toString();
+    }
+
+    private FunctionIdentifier getIdentifier() {
+        return TPCDSDataGeneratorRewriter.TPCDS_DATA_GENERATOR;
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorRewriter.java
new file mode 100644
index 0000000..fb871b9
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorRewriter.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.common.functions.FunctionConstants;
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.asterix.om.base.ADouble;
+import org.apache.asterix.om.base.AFloat;
+import org.apache.asterix.om.base.AInt16;
+import org.apache.asterix.om.base.AInt32;
+import org.apache.asterix.om.base.AInt64;
+import org.apache.asterix.om.base.AInt8;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.base.IAObject;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.exceptions.TypeMismatchException;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+/**
+ * This TPC-DS function is used to generate data with accordance to the specifications of the TPC Benchmark DS.
+ *
+ * The data generator takes 2 arguments:
+ * - first argument: a valid table name.
+ * - second argument: the desired scaling factor.
+ */
+public class TPCDSDataGeneratorRewriter extends FunctionRewriter {
+
+    public static final FunctionIdentifier TPCDS_DATA_GENERATOR =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "tpcds-datagen", 2);
+    public static final TPCDSDataGeneratorRewriter INSTANCE = new TPCDSDataGeneratorRewriter(TPCDS_DATA_GENERATOR);
+
+    private TPCDSDataGeneratorRewriter(FunctionIdentifier functionId) {
+        super(functionId);
+    }
+
+    @Override
+    protected FunctionDataSource toDatasource(IOptimizationContext context, AbstractFunctionCallExpression function)
+            throws AlgebricksException {
+
+        // TODO(Hussain) check if it is safe to assume that we're receiving constant expressions only as arguments.
+        UnnestingFunctionCallExpression functionCall = (UnnestingFunctionCallExpression) function;
+        ConstantExpression tableNameArgument = (ConstantExpression) functionCall.getArguments().get(0).getValue();
+        ConstantExpression scalingFactorArgument = (ConstantExpression) functionCall.getArguments().get(1).getValue();
+
+        // Extract the values
+        IAObject tableNameArgumentValue = ((AsterixConstantValue) tableNameArgument.getValue()).getObject();
+        IAObject scalingFactorArgumentValue = ((AsterixConstantValue) scalingFactorArgument.getValue()).getObject();
+
+        // Get the arguments' types and validate them
+        IAType tableNameType = tableNameArgumentValue.getType();
+        IAType scalingFactorType = scalingFactorArgumentValue.getType();
+
+        if (tableNameType.getTypeTag() != ATypeTag.STRING) {
+            throw new TypeMismatchException(functionCall.getFunctionIdentifier(), 0, tableNameType.getTypeTag(),
+                    ATypeTag.STRING);
+        }
+
+        // Ensure the scaling factor can be promoted to double
+        if (!ATypeHierarchy.canPromote(scalingFactorType.getTypeTag(), ATypeTag.DOUBLE)) {
+            throw new TypeMismatchException(TPCDS_DATA_GENERATOR, 1, scalingFactorType.getTypeTag(), ATypeTag.TINYINT,
+                    ATypeTag.SMALLINT, ATypeTag.INTEGER, ATypeTag.BIGINT, ATypeTag.FLOAT, ATypeTag.DOUBLE);
+        }
+
+        // Convert whichever number type we received into double
+        double scalingFactor = getScalingFactor(scalingFactorArgumentValue);
+        String tableName = ((AString) tableNameArgumentValue).getStringValue();
+
+        return new TPCDSDataGeneratorDatasource(context.getComputationNodeDomain(), tableName, scalingFactor);
+    }
+
+    /**
+     * Converts whichever received number type (byte, int, float, ...) into a double value
+     *
+     * @param value IAObject containing the numerical value
+     *
+     * @return The double value of the IAObject
+     */
+    private double getScalingFactor(IAObject value) throws TypeMismatchException {
+        switch (value.getType().getTypeTag()) {
+            case TINYINT:
+                return ((AInt8) value).getByteValue();
+            case SMALLINT:
+                return ((AInt16) value).getShortValue();
+            case INTEGER:
+                return ((AInt32) value).getIntegerValue();
+            case BIGINT:
+                return ((AInt64) value).getLongValue();
+            case FLOAT:
+                return ((AFloat) value).getFloatValue();
+            case DOUBLE:
+                return ((ADouble) value).getDoubleValue();
+            default:
+                throw new TypeMismatchException(TPCDS_DATA_GENERATOR, 1, value.getType().getTypeTag(), ATypeTag.TINYINT,
+                        ATypeTag.SMALLINT, ATypeTag.INTEGER, ATypeTag.BIGINT, ATypeTag.FLOAT, ATypeTag.DOUBLE);
+        }
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
index 94e1c4e..c708cd1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
@@ -27,6 +27,7 @@ import org.apache.asterix.app.function.FeedRewriter;
 import org.apache.asterix.app.function.JobSummariesRewriter;
 import org.apache.asterix.app.function.PingRewriter;
 import org.apache.asterix.app.function.StorageComponentsRewriter;
+import org.apache.asterix.app.function.TPCDSDataGeneratorRewriter;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.utils.RecordUtil;
 
@@ -58,6 +59,12 @@ public class MetadataBuiltinFunctions {
                 (expression, env, mp) -> RecordUtil.FULLY_OPEN_RECORD_TYPE, true);
         BuiltinFunctions.addUnnestFun(PingRewriter.PING, true);
         BuiltinFunctions.addDatasourceFunction(PingRewriter.PING, PingRewriter.INSTANCE);
+        // TPC-DS data generation function
+        BuiltinFunctions.addPrivateFunction(TPCDSDataGeneratorRewriter.TPCDS_DATA_GENERATOR,
+                (expression, env, mp) -> RecordUtil.FULLY_OPEN_RECORD_TYPE, true);
+        BuiltinFunctions.addUnnestFun(TPCDSDataGeneratorRewriter.TPCDS_DATA_GENERATOR, true);
+        BuiltinFunctions.addDatasourceFunction(TPCDSDataGeneratorRewriter.TPCDS_DATA_GENERATOR,
+                TPCDSDataGeneratorRewriter.INSTANCE);
         // Active requests function
         BuiltinFunctions.addFunction(ActiveRequestsRewriter.ACTIVE_REQUESTS,
                 (expression, env, mp) -> RecordUtil.FULLY_OPEN_RECORD_TYPE, true);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/datagen_sf_1_all_tables/datagen_sf_1_all_tables.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/datagen_sf_1_all_tables/datagen_sf_1_all_tables.3.query.sqlpp
new file mode 100644
index 0000000..aa0710e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/datagen_sf_1_all_tables/datagen_sf_1_all_tables.3.query.sqlpp
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+set `import-private-functions` `true`;
+
+select
+  (select value count(*) from tpcds_datagen("call_center", 1.0) as data)[0] as call_center,
+  (select value count(*) from tpcds_datagen("catalog_page", 1.0) as data)[0] as catalog_page,
+  (select value count(*) from tpcds_datagen("catalog_returns", 1.0) as data)[0] as catalog_returns,
+  (select value count(*) from tpcds_datagen("catalog_sales", 1.0) as data)[0] as catalog_sales,
+  (select value count(*) from tpcds_datagen("customer", 1.0) as data)[0] as customer,
+  (select value count(*) from tpcds_datagen("customer_address", 1.0) as data)[0] as customer_address,
+  (select value count(*) from tpcds_datagen("customer_demographics", 1.0) as data)[0] as customer_demographics,
+  (select value count(*) from tpcds_datagen("date_dim", 1.0) as data)[0] as date_dim,
+  (select value count(*) from tpcds_datagen("household_demographics", 1.0) as data)[0] as household_demographics,
+  (select value count(*) from tpcds_datagen("income_band", 1.0) as data)[0] as income_band,
+  (select value count(*) from tpcds_datagen("inventory", 1.0) as data)[0] as inventory,
+  (select value count(*) from tpcds_datagen("item", 1.0) as data)[0] as item,
+  (select value count(*) from tpcds_datagen("promotion", 1.0) as data)[0] as promotion,
+  (select value count(*) from tpcds_datagen("reason", 1.0) as data)[0] as reason,
+  (select value count(*) from tpcds_datagen("ship_mode", 1.0) as data)[0] as ship_mode,
+  (select value count(*) from tpcds_datagen("store", 1.0) as data)[0] as store,
+  (select value count(*) from tpcds_datagen("store_returns", 1.0) as data)[0] as store_returns,
+  (select value count(*) from tpcds_datagen("store_sales", 1.0) as data)[0] as store_sales,
+  (select value count(*) from tpcds_datagen("time_dim", 1.0) as data)[0] as time_dim,
+  (select value count(*) from tpcds_datagen("warehouse", 1.0) as data)[0] as warehouse,
+  (select value count(*) from tpcds_datagen("web_page", 1.0) as data)[0] as web_page,
+  (select value count(*) from tpcds_datagen("web_returns", 1.0) as data)[0] as web_returns,
+  (select value count(*) from tpcds_datagen("web_sales", 1.0) as data)[0] as web_sales,
+  (select value count(*) from tpcds_datagen("web_site", 1.0) as data)[0] as web_site
+;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/datagen_sf_1_invalid_table/datagen_sf_1_invalid_table.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/datagen_sf_1_invalid_table/datagen_sf_1_invalid_table.3.query.sqlpp
new file mode 100644
index 0000000..91666cf
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/datagen_sf_1_invalid_table/datagen_sf_1_invalid_table.3.query.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+set `import-private-functions` `true`;
+
+select
+  (select value count(*) from tpcds_datagen("invalid_table_name", 1.0) as data)[0] as call_center
+;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/datagen_sf_1_small_tables/datagen_sf_1_small_tables.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/datagen_sf_1_small_tables/datagen_sf_1_small_tables.3.query.sqlpp
new file mode 100644
index 0000000..6033f56
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/datagen_sf_1_small_tables/datagen_sf_1_small_tables.3.query.sqlpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+set `import-private-functions` `true`;
+
+select
+  (select value count(*) from tpcds_datagen("call_center", 1.0) as data)[0] as call_center,
+  (select value count(*) from tpcds_datagen("catalog_page", 1.0) as data)[0] as catalog_page,
+  (select value count(*) from tpcds_datagen("date_dim", 1.0) as data)[0] as date_dim,
+  (select value count(*) from tpcds_datagen("household_demographics", 1.0) as data)[0] as household_demographics,
+  (select value count(*) from tpcds_datagen("income_band", 1.0) as data)[0] as income_band,
+  (select value count(*) from tpcds_datagen("item", 1.0) as data)[0] as item,
+  (select value count(*) from tpcds_datagen("promotion", 1.0) as data)[0] as promotion,
+  (select value count(*) from tpcds_datagen("reason", 1.0) as data)[0] as reason,
+  (select value count(*) from tpcds_datagen("ship_mode", 1.0) as data)[0] as ship_mode,
+  (select value count(*) from tpcds_datagen("store", 1.0) as data)[0] as store,
+  (select value count(*) from tpcds_datagen("warehouse", 1.0) as data)[0] as warehouse,
+  (select value count(*) from tpcds_datagen("web_page", 1.0) as data)[0] as web_page,
+  (select value count(*) from tpcds_datagen("web_site", 1.0) as data)[0] as web_site
+;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/tpcds/datagen_sf_1_all_tables/datagen_sf_1_all_tables.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/tpcds/datagen_sf_1_all_tables/datagen_sf_1_all_tables.1.adm
new file mode 100644
index 0000000..aa6650a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/tpcds/datagen_sf_1_all_tables/datagen_sf_1_all_tables.1.adm
@@ -0,0 +1 @@
+{ "call_center": 6, "catalog_page": 11718, "catalog_returns": 144067, "catalog_sales": 1441548, "customer": 100000, "customer_address": 50000, "customer_demographics": 1920800, "date_dim": 73049, "household_demographics": 7200, "income_band": 20, "inventory": 11745000, "item": 18000, "promotion": 300, "reason": 35, "ship_mode": 20, "store": 12, "store_returns": 287514, "store_sales": 2880404, "time_dim": 86400, "warehouse": 5, "web_page": 60, "web_returns": 71763, "web_sales": 719384, "w [...]
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/tpcds/datagen_sf_1_invalid_table/datagen_sf_1_invalid_table.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/tpcds/datagen_sf_1_invalid_table/datagen_sf_1_invalid_table.1.adm
new file mode 100644
index 0000000..e69de29
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/tpcds/datagen_sf_1_small_tables/datagen_sf_1_small_tables.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/tpcds/datagen_sf_1_small_tables/datagen_sf_1_small_tables.1.adm
new file mode 100644
index 0000000..5b15b88
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/tpcds/datagen_sf_1_small_tables/datagen_sf_1_small_tables.1.adm
@@ -0,0 +1 @@
+{ "call_center": 6, "catalog_page": 11718, "date_dim": 73049, "household_demographics": 7200, "income_band": 20, "item": 18000, "promotion": 300, "reason": 35, "ship_mode": 20, "store": 12, "warehouse": 5, "web_page": 60, "web_site": 30 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 2c7452c..f046f74 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -8848,6 +8848,18 @@
   </test-group>
   <test-group name="tpcds">
     <test-case FilePath="tpcds">
+      <compilation-unit name="datagen_sf_1_invalid_table">
+        <output-dir compare="Text">datagen_sf_1_invalid_table</output-dir>
+        <expected-error>Invalid value</expected-error>
+        <source-location>false</source-location>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="tpcds">
+      <compilation-unit name="datagen_sf_1_small_tables">
+        <output-dir compare="Text">datagen_sf_1_small_tables</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="tpcds">
       <compilation-unit name="query-ASTERIXDB-1580">
         <output-dir compare="Text">query-ASTERIXDB-1580</output-dir>
       </compilation-unit>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index c9f1c48..0e81bf7 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -74,6 +74,7 @@ public class ErrorCode {
     public static final int INTEGER_VALUE_EXPECTED = 39;
     public static final int NO_STATEMENT_PROVIDED = 40;
     public static final int REQUEST_CANCELLED = 41;
+    public static final int TPCDS_INVALID_TABLE_NAME = 42;
 
     public static final int UNSUPPORTED_JRE = 100;
 
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index ddaf271..a69a6cf 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -75,6 +75,7 @@
 39 = Expected integer value, got %1$s
 40 = No statement provided
 41 = Request %1$s has been cancelled
+42 = Invalid value: function %1$s input \"%2$s\" did not match any TPC-DS tables
 
 100 = Unsupported JRE: %1$s