You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "Roberto Garcia (Jira)" <ji...@apache.org> on 2020/09/14 13:12:00 UTC
[jira] [Comment Edited] (NIFI-7785) CaptureChangeMySQL processor
captures enum values as "INDEX of those values" from Mysql DB"
[ https://issues.apache.org/jira/browse/NIFI-7785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17195173#comment-17195173 ]
Roberto Garcia edited comment on NIFI-7785 at 9/14/20, 1:11 PM:
----------------------------------------------------------------
I use an "ExecuteGroovyScript" Processor
I added two properties
||Property||Value||Comment||
|DBCPConnectionPoolName|SampleMySQLPool|this is your DBCPConnectionPool 1.12.0
Database Connection URL : jdbc:mysql://localhost:3306/sample
Database Driver Class Name: com.mysql.cj.jdbc.Driver
Database Driver Location(s) : /usr/share/java/mysql-connector-java-8.0.21.jar|
|sqlCmd|SELECT REPLACE(REPLACE(REPLACE(REPLACE(column_type,'enum',''),')',''),'(',''),'\'','') enums
FROM INFORMATION_SCHEMA.COLUMNS
table_name='sample' AND column_name='fruit'|this query get your enums from your column "fruit"|
finally here my Groovy Script:
{code:java}
import java.util.regex.Matcher
import org.apache.nifi.controller.ControllerService
import groovy.sql.Sql
// Executescript attributes
def serviceName = DBCPConnectionPoolName.value
def sqlCmdString = sqlCmd.value
// get controller service lookup from context
def lookup = context.controllerServiceLookup
// search for serviceName in controller services
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
cs -> lookup.getControllerServiceName(cs) == serviceName
}
//Get the service from serviceid
def service = lookup.getControllerService(dbcpServiceId)
// Connect to service
def conn = service.getConnection()
if (!conn) {
log.error( "Failed to connect to " + serviceName)
return;
}
try {
flowFile = session.get()
flowFile = session.write(flowFile, { inputStream, outputStream ->
def content = org.apache.commons.io.IOUtils.toString(inputStream, java.nio.charset.StandardCharsets.UTF_8)
content=content.trim()
def sql = new Sql(conn)
if (!sql) {
log.error( "Failed to get SQL connection")
return
}
def row = sql.firstRow(sqlCmdString)
enums="${row.enums}".trim()
def enumArr = enums.split(',')
String Index=""
Matcher regexMatcher = content =~ /(?s)(?<="id":2,"value":)(\d+?)(?=})/
if (regexMatcher.find()) {
index = regexMatcher.group(1)
}
def enumText= enumArr[index.toInteger() -1]
String cdcConverted = content.replaceAll(/(?s)(?<="id":2,"value":)(\d+?)(?=})/, "\"" + enumText + "\"");
outputStream.write((cdcConverted).getBytes("UTF-8"))
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
log.error('Scripting error' + sqlCmd, e)
session.transfer(flowFile, REL_FAILURE)
}
// Release connection, this is important as it will otherwise block new executions
conn?.close()
{code}
was (Author: robertogarcia):
I use an "ExecuteGroovyScript" Processor
I added two properties
||Property||Value||Comment||
|DBCPConnectionPoolName|SampleMySQLPool|this is your DBCPConnectionPool 1.12.0|
|sqlCmd|SELECT REPLACE(REPLACE(REPLACE(REPLACE(column_type,'enum',''),')',''),'(',''),'\'','') enums
FROM INFORMATION_SCHEMA.COLUMNS
table_name='sample' AND column_name='fruit'|this query get your enums from your column "fruit"|
finally here my Groovy Script:
{code:java}
import java.util.regex.Matcher
import org.apache.nifi.controller.ControllerService
import groovy.sql.Sql
// Executescript attributes
def serviceName = DBCPConnectionPoolName.value
def sqlCmdString = sqlCmd.value
// get controller service lookup from context
def lookup = context.controllerServiceLookup
// search for serviceName in controller services
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
cs -> lookup.getControllerServiceName(cs) == serviceName
}
//Get the service from serviceid
def service = lookup.getControllerService(dbcpServiceId)
// Connect to service
def conn = service.getConnection()
if (!conn) {
log.error( "Failed to connect to " + serviceName)
return;
}
try {
flowFile = session.get()
flowFile = session.write(flowFile, { inputStream, outputStream ->
def content = org.apache.commons.io.IOUtils.toString(inputStream, java.nio.charset.StandardCharsets.UTF_8)
content=content.trim()
def sql = new Sql(conn)
if (!sql) {
log.error( "Failed to get SQL connection")
return
}
def row = sql.firstRow(sqlCmdString)
enums="${row.enums}".trim()
def enumArr = enums.split(',')
String Index=""
Matcher regexMatcher = content =~ /(?s)(?<="id":2,"value":)(\d+?)(?=})/
if (regexMatcher.find()) {
index = regexMatcher.group(1)
}
def enumText= enumArr[index.toInteger() -1]
String cdcConverted = content.replaceAll(/(?s)(?<="id":2,"value":)(\d+?)(?=})/, "\"" + enumText + "\"");
outputStream.write((cdcConverted).getBytes("UTF-8"))
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
log.error('Scripting error' + sqlCmd, e)
session.transfer(flowFile, REL_FAILURE)
}
// Release connection, this is important as it will otherwise block new executions
conn?.close()
{code}
> CaptureChangeMySQL processor captures enum values as "INDEX of those values" from Mysql DB"
> -------------------------------------------------------------------------------------------
>
> Key: NIFI-7785
> URL: https://issues.apache.org/jira/browse/NIFI-7785
> Project: Apache NiFi
> Issue Type: Bug
> Components: Tools and Build
> Affects Versions: 1.11.4
> Environment: Ubuntu EC2 instance with 8 GB ram
> Reporter: zeyk
> Priority: Major
> Labels: features
>
> CaptureChangeMySQL processor captures enum values as "INDEX of those values" rather than the values specified.
> for example:
> A table has columns (id int, fruit enum ('apple','pears','orange'), price int)
> On doing an insert:
> insert into (1,'apple',45)
> insert into (2,'pears',56)
> I have used CaptureChangeMySql processor to capture the CDC changes, the process does the capture but captures the enum column alone based on its index like the sample below:
> for 1st insert:
>
> {
> "type":"insert",
> "timestamp":1599004442000,
> "binlog_filename":"mysql-bin-changelog.000039",
> "binlog_position":1537835,
> "database":"sample",
> "table_name":"sample",
> "table_id":82,
> "columns":[
> {
> "id":1,
> "name":"id",
> "column_type":-5,
> "value":139
> },
> {
> "id":2,
> "name":"fruit",
> "column_type":12,
> "value":0
> },
> {
> "id":3,
> "name":"price",
> "column_type":12,
> "value":45
> }
> ]
> }
>
> for 2nd insert:
>
> {
> "type":"insert",
> "timestamp":1599004442000,
> "binlog_filename":"mysql-bin-changelog.000039",
> "binlog_position":1537835,
> "database":"sample",
> "table_name":"sample",
> "table_id":82,
> "columns":[
> {
> "id":1,
> "name":"id",
> "column_type":-5,
> "value":139
> },
> {
> "id":2,
> "name":"fruit",
> "column_type":12,
> "value":1
> },
> {
> "id":3,
> "name":"price",
> "column_type":12,
> "value":56
> }
> ]
> }
>
>
> So the above has 0 and 1 in place of apple and pears respectively.
>
> Could you of you help me on this, if there are folks who have faced similar kinda issue
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)