You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "Roberto Garcia (Jira)" <ji...@apache.org> on 2020/09/14 03:43:00 UTC

[jira] [Commented] (NIFI-7785) CaptureChangeMySQL processor captures enum values as "INDEX of those values" from Mysql DB"

    [ https://issues.apache.org/jira/browse/NIFI-7785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17195173#comment-17195173 ] 

Roberto Garcia commented on NIFI-7785:
--------------------------------------

I use an "ExecuteGroovyScript" Processor

I added two properties


||Property||Value||Comment||
|DBCPConnectionPoolName|SampleMySQLPool|this is your DBCPConnectionPool 1.12.0|
|sqlCmd|SELECT REPLACE(REPLACE(REPLACE(REPLACE(column_type,'enum',''),')',''),'(',''),'\'','') enums
FROM INFORMATION_SCHEMA.COLUMNS
table_name='sample' AND column_name='fruit'|this query get your enums from your column "fruit"|

 
finally here my Groovy Script:
{code:java}
import java.util.regex.Matcher
import org.apache.nifi.controller.ControllerService
import groovy.sql.Sql

// Executescript attributes
def serviceName = DBCPConnectionPoolName.value
def sqlCmdString = sqlCmd.value
// get controller service lookup from context
def lookup = context.controllerServiceLookup
// search for serviceName in controller services
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
    cs -> lookup.getControllerServiceName(cs) == serviceName
}
//Get the service from serviceid
def service = lookup.getControllerService(dbcpServiceId)
// Connect to service
def conn = service.getConnection()
if (!conn) {
  log.error( "Failed to connect to " + serviceName)
  return;
}
try {
  flowFile = session.get()
  flowFile = session.write(flowFile, { inputStream, outputStream ->
		def content = org.apache.commons.io.IOUtils.toString(inputStream, java.nio.charset.StandardCharsets.UTF_8)
		content=content.trim()
        def sql = new Sql(conn)
        if (!sql) {
        	log.error( "Failed to get SQL connection")
            return
        }
        def row = sql.firstRow(sqlCmdString)
	enums="${row.enums}".trim()
        def enumArr = enums.split(',')
        String Index=""        
        Matcher regexMatcher = content =~ /(?s)(?<="id":2,"value":)(\d+?)(?=})/
        if (regexMatcher.find()) {
             index = regexMatcher.group(1)
        } 
        def enumText= enumArr[index.toInteger() -1]		
        String cdcConverted = content.replaceAll(/(?s)(?<="id":2,"value":)(\d+?)(?=})/, "\"" + enumText + "\"");
        outputStream.write((cdcConverted).getBytes("UTF-8"))
	} as StreamCallback)
  session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
    log.error('Scripting error' + sqlCmd, e)
    session.transfer(flowFile, REL_FAILURE)
}
// Release connection, this is important as it will otherwise block new executions
conn?.close()
{code}
 

> CaptureChangeMySQL processor captures enum values as "INDEX of those values" from Mysql DB"
> -------------------------------------------------------------------------------------------
>
>                 Key: NIFI-7785
>                 URL: https://issues.apache.org/jira/browse/NIFI-7785
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Tools and Build
>    Affects Versions: 1.11.4
>         Environment: Ubuntu EC2 instance with 8 GB ram
>            Reporter: zeyk
>            Priority: Major
>              Labels: features
>
> CaptureChangeMySQL processor captures enum values as "INDEX of those values" rather than the values specified.
> for example:
> A table has columns (id int, fruit enum ('apple','pears','orange'), price int)
> On doing an insert:
> insert into (1,'apple',45)
> insert into (2,'pears',56)
> I have used CaptureChangeMySql processor to capture the CDC changes, the process does the capture but captures the enum column alone based on its index like the sample below:
> for 1st insert:
>  
> {
>  "type":"insert",
>  "timestamp":1599004442000,
>  "binlog_filename":"mysql-bin-changelog.000039",
>  "binlog_position":1537835,
>  "database":"sample",
>  "table_name":"sample",
>  "table_id":82,
>  "columns":[
>  {
>  "id":1,
>  "name":"id",
>  "column_type":-5,
>  "value":139
>  },
>  {
>  "id":2,
>  "name":"fruit",
>  "column_type":12,
>  "value":0
>  },
>  {
>  "id":3,
>  "name":"price",
>  "column_type":12,
>  "value":45
>  }
>  ]
> }
>  
> for 2nd insert:
>  
> {
>  "type":"insert",
>  "timestamp":1599004442000,
>  "binlog_filename":"mysql-bin-changelog.000039",
>  "binlog_position":1537835,
>  "database":"sample",
>  "table_name":"sample",
>  "table_id":82,
>  "columns":[
>  {
>  "id":1,
>  "name":"id",
>  "column_type":-5,
>  "value":139
>  },
>  {
>  "id":2,
>  "name":"fruit",
>  "column_type":12,
>  "value":1
>  },
>  {
>  "id":3,
>  "name":"price",
>  "column_type":12,
>  "value":56
>  }
>  ]
> }
>  
>  
> So the above has 0 and 1 in place of apple and pears respectively.
>  
> Could you of you help me on this, if there are folks who have faced similar kinda issue
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)