You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/03/18 06:54:30 UTC
[12/18] incubator-apex-malhar git commit: APEXMALHAR-1920 #resolve
#comment Add JDBC dimension output operator to Malhar
APEXMALHAR-1920 #resolve #comment Add JDBC dimension output operator to Malhar
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/6937c20b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/6937c20b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/6937c20b
Branch: refs/heads/devel-3
Commit: 6937c20b100f57583ecc9fbcefc2fc77a1076fae
Parents: 5cecce4
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Sun Feb 21 20:10:54 2016 +0530
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Tue Mar 15 01:59:01 2016 -0700
----------------------------------------------------------------------
.../db/jdbc/JDBCDimensionalOutputOperator.java | 464 +++++++++++++++++++
1 file changed, 464 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6937c20b/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java
new file mode 100644
index 0000000..3021521
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java
@@ -0,0 +1,464 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import java.util.List;
+import java.util.Map;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.Context;
+
+import com.datatorrent.lib.appdata.gpo.GPOMutable;
+import com.datatorrent.lib.appdata.schemas.DimensionalConfigurationSchema;
+import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
+import com.datatorrent.lib.appdata.schemas.Type;
+import com.datatorrent.lib.db.AbstractPassThruTransactionableStoreOutputOperator;
+import com.datatorrent.lib.dimensions.DimensionsDescriptor;
+import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate;
+import com.datatorrent.lib.dimensions.DimensionsEvent.EventKey;
+import com.datatorrent.lib.dimensions.aggregator.AggregatorRegistry;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+
+/**
+ * This operator writes updates emitted by a {@link DimensionsStoreHDHT}
+ * operator to a Mysql database. Updates are written to the database in the
+ * following fashion: <br/>
+ * <br/>
+ * <ol>
+ * <li>Aggregates are received from an upstream
+ * {@link AbstractDimensionsComputationFlexibleSingleSchema} operator.</li>
+ * <li>Each aggregate is written to a different table based on its dimension
+ * combination, time bucket, and corresponding aggregation</li>
+ * </ol>
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class JDBCDimensionalOutputOperator
+ extends AbstractPassThruTransactionableStoreOutputOperator<Aggregate, JdbcTransactionalStore>
+{
+ protected static int DEFAULT_BATCH_SIZE = 1000;
+
+ @Min(1)
+ private int batchSize;
+ private final List<Aggregate> tuples;
+
+ private transient int batchStartIdx;
+
+ @NotNull
+ private Map<Integer, Map<String, String>> tableNames;
+ @NotNull
+ private String eventSchema;
+ @NotNull
+ private AggregatorRegistry aggregatorRegistry = AggregatorRegistry.DEFAULT_AGGREGATOR_REGISTRY;
+ private DimensionalConfigurationSchema schema;
+
+ private transient Map<Integer, Map<Integer, PreparedStatement>> ddIDToAggIDToStatement = Maps.newHashMap();
+
+ public JDBCDimensionalOutputOperator()
+ {
+ tuples = Lists.newArrayList();
+ batchSize = DEFAULT_BATCH_SIZE;
+ batchStartIdx = 0;
+ store = new JdbcTransactionalStore();
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+
+ LOG.info("Done setting up super");
+ aggregatorRegistry.setup();
+
+ //Create prepared statements
+ schema = new DimensionalConfigurationSchema(eventSchema, aggregatorRegistry);
+
+ List<FieldsDescriptor> keyFDs = schema.getDimensionsDescriptorIDToKeyDescriptor();
+
+ for (int ddID = 0; ddID < keyFDs.size(); ddID++) {
+
+ LOG.info("ddID {}", ddID);
+ FieldsDescriptor keyFD = keyFDs.get(ddID);
+ Int2ObjectMap<FieldsDescriptor> aggIDToAggFD = schema
+ .getDimensionsDescriptorIDToAggregatorIDToOutputAggregatorDescriptor().get(ddID);
+
+ Map<Integer, PreparedStatement> aggIDToStatement = ddIDToAggIDToStatement.get(ddID);
+
+ if (aggIDToStatement == null) {
+ aggIDToStatement = Maps.newHashMap();
+ ddIDToAggIDToStatement.put(ddID, aggIDToStatement);
+ }
+
+ for (Map.Entry<String, String> aggTable : tableNames.get(ddID).entrySet()) {
+ int aggID = aggregatorRegistry.getIncrementalAggregatorNameToID().get(aggTable.getKey());
+
+ LOG.info("aggID {}", aggID);
+ FieldsDescriptor aggFD = aggIDToAggFD.get(aggID);
+
+ List<String> keyNames = keyFD.getFieldList();
+ keyNames.remove(DimensionsDescriptor.DIMENSION_TIME_BUCKET);
+
+ LOG.info("List fields {}", keyNames);
+ List<String> aggregateNames = aggFD.getFieldList();
+ LOG.info("List fields {}", aggregateNames);
+ String tableName = aggTable.getValue();
+
+ String statementString = buildStatement(tableName, keyNames, aggregateNames);
+
+ try {
+ aggIDToStatement.put(aggID, store.getConnection().prepareStatement(statementString));
+ } catch (SQLException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+ }
+
+ private String buildStatement(String tableName, List<String> keyNames, List<String> aggregateNames)
+ {
+ LOG.info("building statement");
+ StringBuilder sb = new StringBuilder();
+ sb.append("INSERT INTO ");
+ sb.append(tableName);
+ sb.append(" (");
+
+ addList(sb, keyNames);
+ sb.append(",");
+ addList(sb, aggregateNames);
+
+ sb.append(") VALUES (");
+
+ for (int qCounter = 0;; qCounter++) {
+ sb.append("?");
+
+ if (qCounter == keyNames.size() + aggregateNames.size() - 1) {
+ break;
+ }
+
+ sb.append(",");
+ }
+
+ sb.append(") ON DUPLICATE KEY UPDATE ");
+
+ addOnDuplicate(sb, aggregateNames);
+
+ return sb.toString();
+ }
+
+ private void addOnDuplicate(StringBuilder sb, List<String> names)
+ {
+ LOG.info("add Duplicate");
+ for (int index = 0;; index++) {
+
+ String name = names.get(index);
+ sb.append(name);
+ sb.append("=");
+ sb.append("VALUES(");
+ sb.append(name);
+ sb.append(")");
+
+ if (index == names.size() - 1) {
+ break;
+ }
+
+ sb.append(",");
+ }
+ }
+
+ private void addList(StringBuilder sb, List<String> names)
+ {
+ for (int index = 0;; index++) {
+ sb.append(names.get(index));
+
+ if (index == names.size() - 1) {
+ break;
+ }
+
+ sb.append(",");
+ }
+ }
+
+ /**
+ * This sets the table names that corresponds to the dimensions combinations
+ * specified in your {@link DimensionalConfigurationSchema}. The structure of
+ * this property is as follows: <br/>
+ * <br/>
+ * <ol>
+ * <li>The first key is the dimension combination id assigned to a dimension
+ * combination in your {@link DimensionalConfigurationSchema}. <br/>
+ * <br/>
+ * The dimensions descriptor id is determined by the following factors:
+ * <ul>
+ * <li>The dimensions combination specified in the
+ * {@link DimensionalConfigurationSchema}.</li>
+ * <li>The the Time Buckets defined in your
+ * {@link DimensionalConfigurationSchema}.</li>
+ * </ul>
+ * The dimensions descriptor id is computed in the following way:
+ * <ol>
+ * <li>The first dimensions descriptor id is 0</li>
+ * <li>A dimension combination is selected</li>
+ * <li>A time bucket is selected</li>
+ * <li>The current dimension combination and time bucket pair is assigned a
+ * dimensions descriptor id</li>
+ * <li>The current dimensions descriptor id is incremented</li>
+ * <li>Steps 3 - 5 are repeated until all the time buckets are done</li>
+ * <li>Steps 2 - 6 are repeated until all the dimension combinations are done.
+ * </li>
+ * </ol>
+ * <br/>
+ * <</li>
+ * <li>The second key is the name of an aggregation being performed for that
+ * dimensions combination.</li>
+ * <li>The value is the name of the output Mysql table</li>
+ * </ol>
+ *
+ * @param tableNames
+ * The table names that corresponds to the dimensions combinations
+ * specified in your {@link DimensionalConfigurationSchema}.
+ */
+ public void setTableNames(Map<Integer, Map<String, String>> tableNames)
+ {
+ this.tableNames = Preconditions.checkNotNull(tableNames);
+ }
+
+ /**
+ * Sets the JSON corresponding to the {@link DimensionalConfigurationSchema}
+ * which was set on the upstream {@link AppDataSingleSchemaDimensionStoreHDHT}
+ * and {@link AbstractDimensionsComputationFlexibleSingleSchema} operators.
+ *
+ * @param eventSchema
+ * The JSON corresponding to the
+ * {@link DimensionalConfigurationSchema} which was set on the
+ * upstream {@link AppDataSingleSchemaDimensionStoreHDHT} and
+ * {@link AbstractDimensionsComputationFlexibleSingleSchema}
+ * operators.
+ */
+ public void setEventSchema(String eventSchema)
+ {
+ this.eventSchema = eventSchema;
+ }
+
+ /**
+ * Sets the {@link AggregatorRegistry} that is used to determine what
+ * aggregators correspond to what ids.
+ *
+ * @param aggregatorRegistry
+ * The {@link AggregatorRegistry} that is used to determine what
+ * aggregators correspond to what ids.
+ */
+ public void setAggregatorRegistry(AggregatorRegistry aggregatorRegistry)
+ {
+ this.aggregatorRegistry = aggregatorRegistry;
+ }
+
+ @Override
+ public void endWindow()
+ {
+ //Process any remaining tuples.
+ if (tuples.size() - batchStartIdx > 0) {
+ processBatch();
+ }
+ super.endWindow();
+ tuples.clear();
+ batchStartIdx = 0;
+ }
+
+ @Override
+ public void processTuple(Aggregate tuple)
+ {
+ tuples.add(tuple);
+ if ((tuples.size() - batchStartIdx) >= batchSize) {
+ processBatch();
+ }
+ }
+
+ /**
+ * Processes all the tuples in the batch once the batch size for the operator
+ * is reached.
+ */
+ private void processBatch()
+ {
+ LOG.info("start {} end {}", batchStartIdx, tuples.size());
+ try {
+ for (int i = batchStartIdx; i < tuples.size(); i++) {
+ setStatementParameters(tuples.get(i));
+ }
+
+ for (Map.Entry<Integer, Map<Integer, PreparedStatement>> ddIDToAggIDToStatementEntry : ddIDToAggIDToStatement
+ .entrySet()) {
+ for (Map.Entry<Integer, PreparedStatement> entry : ddIDToAggIDToStatementEntry.getValue().entrySet()) {
+ entry.getValue().executeBatch();
+ entry.getValue().clearBatch();
+ }
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("processing batch", e);
+ } finally {
+ batchStartIdx += tuples.size() - batchStartIdx;
+ }
+ }
+
+ /**
+ * Sets the parameters on the {@link java.sql.PreparedStatement} based on the
+ * values in the given {@link Aggregate}.
+ *
+ * @param aggregate
+ * The {@link Aggregate} whose values will be set on the
+ * corresponding {@link java.sql.PreparedStatement}.
+ */
+ private void setStatementParameters(Aggregate aggregate)
+ {
+ EventKey eventKey = aggregate.getEventKey();
+
+ int ddID = eventKey.getDimensionDescriptorID();
+ int aggID = eventKey.getAggregatorID();
+
+ LOG.info("Setting statement params {} {}", ddID, aggID);
+
+ FieldsDescriptor keyFD = schema.getDimensionsDescriptorIDToKeyDescriptor().get(ddID);
+ FieldsDescriptor aggFD = schema.getDimensionsDescriptorIDToAggregatorIDToOutputAggregatorDescriptor().get(ddID)
+ .get(aggID);
+
+ GPOMutable key = eventKey.getKey();
+ key.setFieldDescriptor(keyFD);
+
+ GPOMutable value = aggregate.getAggregates();
+ value.setFieldDescriptor(aggFD);
+
+ int qCounter = 1;
+
+ PreparedStatement ps = ddIDToAggIDToStatement.get(ddID).get(aggID);
+
+ try {
+ qCounter = setParams(ps, key, qCounter, true);
+ setParams(ps, value, qCounter, false);
+ ps.addBatch();
+ } catch (SQLException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /**
+ * @param ps
+ * The {@link java.sql.PreparedStatement} which will do an insert
+ * into the Mysql database.
+ * @param gpo
+ * The {@link GPOMutable} object whose values need to be set in the
+ * preparted statement.
+ * @param qCounter
+ * The current index in the prepared statement
+ * @param isKey
+ * TODO use this
+ * @return The current index in the prepared statement.
+ * @throws SQLException
+ */
+ private int setParams(PreparedStatement ps, GPOMutable gpo, int qCounter, boolean isKey) throws SQLException
+ {
+ FieldsDescriptor fd = gpo.getFieldDescriptor();
+
+ Map<String, Type> fieldToType = fd.getFieldToType();
+ List<String> fields = fd.getFieldList();
+
+ for (int fieldCounter = 0; fieldCounter < fields.size(); fieldCounter++, qCounter++) {
+ String fieldName = fields.get(fieldCounter);
+
+ if (fieldName.equals(DimensionsDescriptor.DIMENSION_TIME_BUCKET)) {
+ qCounter--;
+ continue;
+ }
+
+ Type type = fieldToType.get(fieldName);
+
+ LOG.info("Field Name {} {}", fieldName, qCounter);
+
+ switch (type) {
+ case BOOLEAN: {
+ ps.setByte(qCounter, (byte)(gpo.getFieldBool(fieldName) ? 1 : 0));
+ break;
+ }
+ case BYTE: {
+ ps.setByte(qCounter, gpo.getFieldByte(fieldName));
+ break;
+ }
+ case CHAR: {
+ ps.setString(qCounter, Character.toString(gpo.getFieldChar(fieldName)));
+ break;
+ }
+ case STRING: {
+ ps.setString(qCounter, gpo.getFieldString(fieldName));
+ break;
+ }
+ case SHORT: {
+ ps.setInt(qCounter, gpo.getFieldShort(fieldName));
+ break;
+ }
+ case INTEGER: {
+ ps.setInt(qCounter, gpo.getFieldInt(fieldName));
+ break;
+ }
+ case LONG: {
+ ps.setLong(qCounter, gpo.getFieldLong(fieldName));
+ break;
+ }
+ case FLOAT: {
+ ps.setFloat(qCounter, gpo.getFieldFloat(fieldName));
+ break;
+ }
+ case DOUBLE: {
+ ps.setDouble(qCounter, gpo.getFieldDouble(fieldName));
+ break;
+ }
+ default: {
+ throw new UnsupportedOperationException("The type: " + type + " is not supported.");
+ }
+ }
+ }
+
+ return qCounter;
+ }
+
+ /**
+ * Sets the size of a batch operation.<br/>
+ * <b>Default:</b> {@value #DEFAULT_BATCH_SIZE}
+ *
+ * @param batchSize
+ * size of a batch
+ */
+ public void setBatchSize(int batchSize)
+ {
+ this.batchSize = batchSize;
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcTransactionableOutputOperator.class);
+}
+