You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/06/18 19:40:47 UTC

[GitHub] [nifi] readl1 commented on a change in pull request #2231: NIFI-4521 MS SQL CDC Processor

readl1 commented on a change in pull request #2231:
URL: https://github.com/apache/nifi/pull/2231#discussion_r442460589



##########
File path: nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java
##########
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mssql.processors;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.cdc.CDCException;
+import org.apache.nifi.cdc.mssql.MSSQLCDCUtils;
+import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo;
+import org.apache.nifi.cdc.mssql.event.TableCapturePlan;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.StreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.ResultSetRecordSet;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+@TriggerSerially
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@Tags({"sql", "jdbc", "cdc", "mssql"})
+@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a Microsoft SQL database. CDC Events include INSERT, UPDATE, DELETE operations. Events "
+        + "for each table are output as Record Sets, ordered by the time, and sequence, at which the operation occurred. In a cluster, it is recommended to run "
+        + "this processor on primary only.")
+@Stateful(scopes = Scope.CLUSTER, description = "Information including the timestamp of the last CDC event per table in the database is stored by this processor, so "
+        + "that it can continue from the same point in time if restarted.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "tablename", description="Name of the table this changeset was captured from."),
+        @WritesAttribute(attribute="mssqlcdc.row.count", description="The number of rows in this changeset"),
+        @WritesAttribute(attribute="fullsnapshot", description="Whether this was a full snapshot of the base table or not.")})
+@DynamicProperty(name = "Initial Timestamp", value = "Attribute Expression Language", supportsExpressionLanguage = false, description = "Specifies an initial "
+        + "timestamp for reading CDC data from MS SQL. Properties should be added in the format `initial.timestamp.{table_name}`, one for each table. "
+        + "This property is ignored after the first successful run for a table writes to the state manager, and is only used again if state is cleared.")
+public class CaptureChangeMSSQL extends AbstractSessionFactoryProcessor {
+    public static final String INITIAL_TIMESTAMP_PROP_START = "initial.timestamp.";
+
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the Controller Service to use for writing out the records")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
+            .name("cdcmssql-dbcp-service")
+            .displayName("Database Connection Pooling Service")
+            .description("The Controller Service that is used to obtain connection to database")
+            .required(true)
+            .identifiesControllerService(DBCPService.class)
+            .build();
+
+    public static final PropertyDescriptor CDC_TABLES = new PropertyDescriptor.Builder()
+            .name("cdcmssql-cdc-table-list")
+            .displayName("CDC Table List")
+            .description("The comma delimited list of tables in the source database to monitor for changes. If no tables "
+                    + "are specified the [cdc].[change_tables] table is queried for all of the available tables with change tracking enabled in the database.")
+            .required(false)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor TAKE_INITIAL_SNAPSHOT = new PropertyDescriptor.Builder()
+            .name("cdcmssql-initial-snapshot")
+            .displayName("Generate an Initial Source Table Snapshot")
+            .description("Usually CDC only includes recent historic changes. Setting this property to true will cause a snapshot of the "
+                + "source table to be taken using the same schema as the CDC extracts. The snapshot time will be used as the starting point "
+                + "for extracting CDC changes.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor FULL_SNAPSHOT_ROW_LIMIT = new PropertyDescriptor.Builder()
+            .name("cdcmssql-full-snapshot-row-limit")
+            .displayName("Changeset Row Limit")
+            .description("If a very large change occurs on the source table, "
+                    + "the generated changeset may be too large to quickly merge into a destination system. "
+                    + "Use this property to set a cut-off point where instead of returning a changeset a full snapshot will be generated instead. "
+                    + "The fullsnapshot attribute will be set to true when this happens.")
+            .required(true)
+            .defaultValue("0")
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor INCLUDE_PREUPDATE_VALUES = new PropertyDescriptor.Builder()
+            .name("cdcmssql-include-preupdate-values")
+            .displayName("Include Pre-Update Values")
+            .description("When an update transaction occurs, should both the old values (operation=3) and the new values (operation=4) be returned. "
+                        + "If true, both rows will always be in the same FlowFile and be sequential records, old values followed by new values. "
+                        + "If you have unchangeable primary key(s) you do not need the old values to match-up the updated record to an old version, "
+                        + "and you could reduce the number of rows to process by setting this property to false.")
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Successfully created FlowFile from SQL query result set.")
+            .build();
+
+    protected List<PropertyDescriptor> descriptors;
+    protected Set<Relationship> relationships;
+
+    protected final Map<String, MSSQLTableInfo> schemaCache = new ConcurrentHashMap<String, MSSQLTableInfo>();
+
+    // A Map (name to value) of initial maximum-value properties, filled at schedule-time and used at trigger-time
+    protected Map<String,String> maxValueProperties;
+    protected MSSQLCDCUtils mssqlcdcUtils = new MSSQLCDCUtils();
+
+    public MSSQLCDCUtils getMssqlcdcUtils(){
+        return mssqlcdcUtils;
+    }
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
+        descriptors.add(RECORD_WRITER);
+        descriptors.add(DBCP_SERVICE);
+        descriptors.add(CDC_TABLES);
+        descriptors.add(TAKE_INITIAL_SNAPSHOT);
+        descriptors.add(FULL_SNAPSHOT_ROW_LIMIT);
+        descriptors.add(INCLUDE_PREUPDATE_VALUES);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<Relationship>();
+        relationships.add(REL_SUCCESS);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+        if(!propertyDescriptorName.startsWith("initial.timestamp.")){
+            return null;
+        }
+
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .displayName(propertyDescriptorName)
+                .required(false)
+                .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
+                .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+                .expressionLanguageSupported(false)
+                .dynamic(true)
+                .build();
+    }
+
+    @Override
+    public void onTrigger(ProcessContext processContext, ProcessSessionFactory processSessionFactory) throws ProcessException {
+        ProcessSession session = processSessionFactory.createSession();
+
+        final ComponentLog logger = getLogger();
+        final RecordSetWriterFactory writerFactory = processContext.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final DBCPService dbcpService = processContext.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
+
+        final boolean takeInitialSnapshot = processContext.getProperty(TAKE_INITIAL_SNAPSHOT).asBoolean();
+        final boolean includePreupdateValues = processContext.getProperty(INCLUDE_PREUPDATE_VALUES).asBoolean();
+        final int fullSnapshotRowLimit = processContext.getProperty(FULL_SNAPSHOT_ROW_LIMIT).evaluateAttributeExpressions().asInteger();
+
+        final String[] allTables = schemaCache.keySet().toArray(new String[schemaCache.size()]);
+
+        String[] tables = StringUtils
+                .split(processContext.getProperty(CDC_TABLES).evaluateAttributeExpressions().getValue(), ",");

Review comment:
       @mattyb149 @patricker We still trying to get this merged into master? Anything I can do from the testing standpoint to help that along?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org